async_nats/client.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use core::pin::Pin;
15use core::task::{Context, Poll};
16
17use crate::connection::State;
18use crate::subject::ToSubject;
19use crate::{PublishMessage, ServerInfo};
20
21use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
22use crate::error::Error;
23use bytes::Bytes;
24use futures::future::TryFutureExt;
25use futures::{Sink, SinkExt as _, StreamExt};
26use once_cell::sync::Lazy;
27use portable_atomic::AtomicU64;
28use regex::Regex;
29use std::fmt::Display;
30use std::sync::atomic::{AtomicUsize, Ordering};
31use std::sync::Arc;
32use std::time::Duration;
33use thiserror::Error;
34use tokio::sync::{mpsc, oneshot};
35use tokio_util::sync::PollSender;
36use tracing::trace;
37
38static VERSION_RE: Lazy<Regex> =
39 Lazy::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
40
41/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
42/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
43pub type PublishError = Error<PublishErrorKind>;
44
45impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
46 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
47 PublishError::with_source(PublishErrorKind::Send, err)
48 }
49}
50
51impl From<tokio_util::sync::PollSendError<Command>> for PublishError {
52 fn from(err: tokio_util::sync::PollSendError<Command>) -> Self {
53 PublishError::with_source(PublishErrorKind::Send, err)
54 }
55}
56
57#[derive(Copy, Clone, Debug, PartialEq)]
58pub enum PublishErrorKind {
59 MaxPayloadExceeded,
60 Send,
61}
62
63impl Display for PublishErrorKind {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 match self {
66 PublishErrorKind::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
67 PublishErrorKind::Send => write!(f, "failed to send message"),
68 }
69 }
70}
71
72/// Client is a `Cloneable` handle to NATS connection.
73/// Client should not be created directly. Instead, one of two methods can be used:
74/// [crate::connect] and [crate::ConnectOptions::connect]
75#[derive(Clone, Debug)]
76pub struct Client {
77 info: tokio::sync::watch::Receiver<ServerInfo>,
78 pub(crate) state: tokio::sync::watch::Receiver<State>,
79 pub(crate) sender: mpsc::Sender<Command>,
80 poll_sender: PollSender<Command>,
81 next_subscription_id: Arc<AtomicU64>,
82 subscription_capacity: usize,
83 inbox_prefix: Arc<str>,
84 request_timeout: Option<Duration>,
85 max_payload: Arc<AtomicUsize>,
86 connection_stats: Arc<Statistics>,
87}
88
89impl Sink<PublishMessage> for Client {
90 type Error = PublishError;
91
92 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
93 self.poll_sender.poll_ready_unpin(cx).map_err(Into::into)
94 }
95
96 fn start_send(mut self: Pin<&mut Self>, msg: PublishMessage) -> Result<(), Self::Error> {
97 self.poll_sender
98 .start_send_unpin(Command::Publish(msg))
99 .map_err(Into::into)
100 }
101
102 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
103 self.poll_sender.poll_flush_unpin(cx).map_err(Into::into)
104 }
105
106 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
107 self.poll_sender.poll_close_unpin(cx).map_err(Into::into)
108 }
109}
110
111impl Client {
112 #[allow(clippy::too_many_arguments)]
113 pub(crate) fn new(
114 info: tokio::sync::watch::Receiver<ServerInfo>,
115 state: tokio::sync::watch::Receiver<State>,
116 sender: mpsc::Sender<Command>,
117 capacity: usize,
118 inbox_prefix: String,
119 request_timeout: Option<Duration>,
120 max_payload: Arc<AtomicUsize>,
121 statistics: Arc<Statistics>,
122 ) -> Client {
123 let poll_sender = PollSender::new(sender.clone());
124 Client {
125 info,
126 state,
127 sender,
128 poll_sender,
129 next_subscription_id: Arc::new(AtomicU64::new(1)),
130 subscription_capacity: capacity,
131 inbox_prefix: inbox_prefix.into(),
132 request_timeout,
133 max_payload,
134 connection_stats: statistics,
135 }
136 }
137
138 /// Returns the default timeout for requests set when creating the client.
139 ///
140 /// # Examples
141 /// ```no_run
142 /// # #[tokio::main]
143 /// # async fn main() -> Result<(), async_nats::Error> {
144 /// let client = async_nats::connect("demo.nats.io").await?;
145 /// println!("default request timeout: {:?}", client.timeout());
146 /// # Ok(())
147 /// # }
148 /// ```
149 pub fn timeout(&self) -> Option<Duration> {
150 self.request_timeout
151 }
152
153 /// Returns last received info from the server.
154 ///
155 /// # Examples
156 ///
157 /// ```no_run
158 /// # #[tokio::main]
159 /// # async fn main () -> Result<(), async_nats::Error> {
160 /// let client = async_nats::connect("demo.nats.io").await?;
161 /// println!("info: {:?}", client.server_info());
162 /// # Ok(())
163 /// # }
164 /// ```
165 pub fn server_info(&self) -> ServerInfo {
166 // We ignore notifying the watcher, as that requires mutable client reference.
167 self.info.borrow().to_owned()
168 }
169
170 /// Returns true if the server version is compatible with the version components.
171 ///
172 /// This has to be used with caution, as it is not guaranteed that the server
173 /// that client is connected to is the same version that the one that is
174 /// a JetStream meta/stream/consumer leader, especially across leafnodes.
175 ///
176 /// # Examples
177 ///
178 /// ```no_run
179 /// # #[tokio::main]
180 /// # async fn main() -> Result<(), async_nats::Error> {
181 /// let client = async_nats::connect("demo.nats.io").await?;
182 /// assert!(client.is_server_compatible(2, 8, 4));
183 /// # Ok(())
184 /// # }
185 /// ```
186 pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
187 let info = self.server_info();
188
189 let server_version_captures = match VERSION_RE.captures(&info.version) {
190 Some(captures) => captures,
191 None => return false,
192 };
193
194 let server_major = server_version_captures
195 .get(1)
196 .map(|m| m.as_str().parse::<i64>().unwrap())
197 .unwrap();
198
199 let server_minor = server_version_captures
200 .get(2)
201 .map(|m| m.as_str().parse::<i64>().unwrap())
202 .unwrap();
203
204 let server_patch = server_version_captures
205 .get(3)
206 .map(|m| m.as_str().parse::<i64>().unwrap())
207 .unwrap();
208
209 if server_major < major
210 || (server_major == major && server_minor < minor)
211 || (server_major == major && server_minor == minor && server_patch < patch)
212 {
213 return false;
214 }
215 true
216 }
217
218 /// Publish a [Message] to a given subject.
219 ///
220 /// # Examples
221 /// ```no_run
222 /// # #[tokio::main]
223 /// # async fn main() -> Result<(), async_nats::Error> {
224 /// let client = async_nats::connect("demo.nats.io").await?;
225 /// client.publish("events.data", "payload".into()).await?;
226 /// # Ok(())
227 /// # }
228 /// ```
229 pub async fn publish<S: ToSubject>(
230 &self,
231 subject: S,
232 payload: Bytes,
233 ) -> Result<(), PublishError> {
234 let subject = subject.to_subject();
235 let max_payload = self.max_payload.load(Ordering::Relaxed);
236 if payload.len() > max_payload {
237 return Err(PublishError::with_source(
238 PublishErrorKind::MaxPayloadExceeded,
239 format!(
240 "Payload size limit of {} exceeded by message size of {}",
241 payload.len(),
242 max_payload
243 ),
244 ));
245 }
246
247 self.sender
248 .send(Command::Publish(PublishMessage {
249 subject,
250 payload,
251 reply: None,
252 headers: None,
253 }))
254 .await?;
255 Ok(())
256 }
257
258 /// Publish a [Message] with headers to a given subject.
259 ///
260 /// # Examples
261 /// ```
262 /// # #[tokio::main]
263 /// # async fn main() -> Result<(), async_nats::Error> {
264 /// use std::str::FromStr;
265 /// let client = async_nats::connect("demo.nats.io").await?;
266 /// let mut headers = async_nats::HeaderMap::new();
267 /// headers.insert(
268 /// "X-Header",
269 /// async_nats::HeaderValue::from_str("Value").unwrap(),
270 /// );
271 /// client
272 /// .publish_with_headers("events.data", headers, "payload".into())
273 /// .await?;
274 /// # Ok(())
275 /// # }
276 /// ```
277 pub async fn publish_with_headers<S: ToSubject>(
278 &self,
279 subject: S,
280 headers: HeaderMap,
281 payload: Bytes,
282 ) -> Result<(), PublishError> {
283 let subject = subject.to_subject();
284
285 self.sender
286 .send(Command::Publish(PublishMessage {
287 subject,
288 payload,
289 reply: None,
290 headers: Some(headers),
291 }))
292 .await?;
293 Ok(())
294 }
295
296 /// Publish a [Message] to a given subject, with specified response subject
297 /// to which the subscriber can respond.
298 /// This method does not await for the response.
299 ///
300 /// # Examples
301 ///
302 /// ```no_run
303 /// # #[tokio::main]
304 /// # async fn main() -> Result<(), async_nats::Error> {
305 /// let client = async_nats::connect("demo.nats.io").await?;
306 /// client
307 /// .publish_with_reply("events.data", "reply_subject", "payload".into())
308 /// .await?;
309 /// # Ok(())
310 /// # }
311 /// ```
312 pub async fn publish_with_reply<S: ToSubject, R: ToSubject>(
313 &self,
314 subject: S,
315 reply: R,
316 payload: Bytes,
317 ) -> Result<(), PublishError> {
318 let subject = subject.to_subject();
319 let reply = reply.to_subject();
320
321 self.sender
322 .send(Command::Publish(PublishMessage {
323 subject,
324 payload,
325 reply: Some(reply),
326 headers: None,
327 }))
328 .await?;
329 Ok(())
330 }
331
332 /// Publish a [Message] to a given subject with headers and specified response subject
333 /// to which the subscriber can respond.
334 /// This method does not await for the response.
335 ///
336 /// # Examples
337 ///
338 /// ```no_run
339 /// # #[tokio::main]
340 /// # async fn main() -> Result<(), async_nats::Error> {
341 /// use std::str::FromStr;
342 /// let client = async_nats::connect("demo.nats.io").await?;
343 /// let mut headers = async_nats::HeaderMap::new();
344 /// client
345 /// .publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
346 /// .await?;
347 /// # Ok(())
348 /// # }
349 /// ```
350 pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>(
351 &self,
352 subject: S,
353 reply: R,
354 headers: HeaderMap,
355 payload: Bytes,
356 ) -> Result<(), PublishError> {
357 let subject = subject.to_subject();
358 let reply = reply.to_subject();
359
360 self.sender
361 .send(Command::Publish(PublishMessage {
362 subject,
363 payload,
364 reply: Some(reply),
365 headers: Some(headers),
366 }))
367 .await?;
368 Ok(())
369 }
370
371 /// Sends the request with headers.
372 ///
373 /// # Examples
374 /// ```no_run
375 /// # #[tokio::main]
376 /// # async fn main() -> Result<(), async_nats::Error> {
377 /// let client = async_nats::connect("demo.nats.io").await?;
378 /// let response = client.request("service", "data".into()).await?;
379 /// # Ok(())
380 /// # }
381 /// ```
382 pub async fn request<S: ToSubject>(
383 &self,
384 subject: S,
385 payload: Bytes,
386 ) -> Result<Message, RequestError> {
387 let subject = subject.to_subject();
388
389 trace!(
390 "request sent to subject: {} ({})",
391 subject.as_ref(),
392 payload.len()
393 );
394 let request = Request::new().payload(payload);
395 self.send_request(subject, request).await
396 }
397
398 /// Sends the request with headers.
399 ///
400 /// # Examples
401 /// ```no_run
402 /// # #[tokio::main]
403 /// # async fn main() -> Result<(), async_nats::Error> {
404 /// let client = async_nats::connect("demo.nats.io").await?;
405 /// let mut headers = async_nats::HeaderMap::new();
406 /// headers.insert("Key", "Value");
407 /// let response = client
408 /// .request_with_headers("service", headers, "data".into())
409 /// .await?;
410 /// # Ok(())
411 /// # }
412 /// ```
413 pub async fn request_with_headers<S: ToSubject>(
414 &self,
415 subject: S,
416 headers: HeaderMap,
417 payload: Bytes,
418 ) -> Result<Message, RequestError> {
419 let subject = subject.to_subject();
420
421 let request = Request::new().headers(headers).payload(payload);
422 self.send_request(subject, request).await
423 }
424
425 /// Sends the request created by the [Request].
426 ///
427 /// # Examples
428 ///
429 /// ```no_run
430 /// # #[tokio::main]
431 /// # async fn main() -> Result<(), async_nats::Error> {
432 /// let client = async_nats::connect("demo.nats.io").await?;
433 /// let request = async_nats::Request::new().payload("data".into());
434 /// let response = client.send_request("service", request).await?;
435 /// # Ok(())
436 /// # }
437 /// ```
438 pub async fn send_request<S: ToSubject>(
439 &self,
440 subject: S,
441 request: Request,
442 ) -> Result<Message, RequestError> {
443 let subject = subject.to_subject();
444
445 if let Some(inbox) = request.inbox {
446 let timeout = request.timeout.unwrap_or(self.request_timeout);
447 let mut subscriber = self.subscribe(inbox.clone()).await?;
448 let payload: Bytes = request.payload.unwrap_or_default();
449 match request.headers {
450 Some(headers) => {
451 self.publish_with_reply_and_headers(subject, inbox, headers, payload)
452 .await?
453 }
454 None => self.publish_with_reply(subject, inbox, payload).await?,
455 }
456 let request = match timeout {
457 Some(timeout) => {
458 tokio::time::timeout(timeout, subscriber.next())
459 .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
460 .await?
461 }
462 None => subscriber.next().await,
463 };
464 match request {
465 Some(message) => {
466 if message.status == Some(StatusCode::NO_RESPONDERS) {
467 return Err(RequestError::with_source(
468 RequestErrorKind::NoResponders,
469 "no responders",
470 ));
471 }
472 Ok(message)
473 }
474 None => Err(RequestError::with_source(
475 RequestErrorKind::Other,
476 "broken pipe",
477 )),
478 }
479 } else {
480 let (sender, receiver) = oneshot::channel();
481
482 let payload = request.payload.unwrap_or_default();
483 let respond = self.new_inbox().into();
484 let headers = request.headers;
485
486 self.sender
487 .send(Command::Request {
488 subject,
489 payload,
490 respond,
491 headers,
492 sender,
493 })
494 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
495 .await?;
496
497 let timeout = request.timeout.unwrap_or(self.request_timeout);
498 let request = match timeout {
499 Some(timeout) => {
500 tokio::time::timeout(timeout, receiver)
501 .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
502 .await?
503 }
504 None => receiver.await,
505 };
506
507 match request {
508 Ok(message) => {
509 if message.status == Some(StatusCode::NO_RESPONDERS) {
510 return Err(RequestError::with_source(
511 RequestErrorKind::NoResponders,
512 "no responders",
513 ));
514 }
515 Ok(message)
516 }
517 Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
518 }
519 }
520 }
521
522 /// Create a new globally unique inbox which can be used for replies.
523 ///
524 /// # Examples
525 ///
526 /// ```no_run
527 /// # #[tokio::main]
528 /// # async fn main() -> Result<(), async_nats::Error> {
529 /// # let mut nc = async_nats::connect("demo.nats.io").await?;
530 /// let reply = nc.new_inbox();
531 /// let rsub = nc.subscribe(reply).await?;
532 /// # Ok(())
533 /// # }
534 /// ```
535 pub fn new_inbox(&self) -> String {
536 format!("{}.{}", self.inbox_prefix, nuid::next())
537 }
538
539 /// Subscribes to a subject to receive [messages][Message].
540 ///
541 /// # Examples
542 ///
543 /// ```no_run
544 /// # #[tokio::main]
545 /// # async fn main() -> Result<(), async_nats::Error> {
546 /// use futures::StreamExt;
547 /// let client = async_nats::connect("demo.nats.io").await?;
548 /// let mut subscription = client.subscribe("events.>").await?;
549 /// while let Some(message) = subscription.next().await {
550 /// println!("received message: {:?}", message);
551 /// }
552 /// # Ok(())
553 /// # }
554 /// ```
555 pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
556 let subject = subject.to_subject();
557 let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
558 let (sender, receiver) = mpsc::channel(self.subscription_capacity);
559
560 self.sender
561 .send(Command::Subscribe {
562 sid,
563 subject,
564 queue_group: None,
565 sender,
566 })
567 .await?;
568
569 Ok(Subscriber::new(sid, self.sender.clone(), receiver))
570 }
571
572 /// Subscribes to a subject with a queue group to receive [messages][Message].
573 ///
574 /// # Examples
575 ///
576 /// ```no_run
577 /// # #[tokio::main]
578 /// # async fn main() -> Result<(), async_nats::Error> {
579 /// use futures::StreamExt;
580 /// let client = async_nats::connect("demo.nats.io").await?;
581 /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
582 /// while let Some(message) = subscription.next().await {
583 /// println!("received message: {:?}", message);
584 /// }
585 /// # Ok(())
586 /// # }
587 /// ```
588 pub async fn queue_subscribe<S: ToSubject>(
589 &self,
590 subject: S,
591 queue_group: String,
592 ) -> Result<Subscriber, SubscribeError> {
593 let subject = subject.to_subject();
594
595 let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
596 let (sender, receiver) = mpsc::channel(self.subscription_capacity);
597
598 self.sender
599 .send(Command::Subscribe {
600 sid,
601 subject,
602 queue_group: Some(queue_group),
603 sender,
604 })
605 .await?;
606
607 Ok(Subscriber::new(sid, self.sender.clone(), receiver))
608 }
609
610 /// Flushes the internal buffer ensuring that all messages are sent.
611 ///
612 /// # Examples
613 ///
614 /// ```no_run
615 /// # #[tokio::main]
616 /// # async fn main() -> Result<(), async_nats::Error> {
617 /// let client = async_nats::connect("demo.nats.io").await?;
618 /// client.flush().await?;
619 /// # Ok(())
620 /// # }
621 /// ```
622 pub async fn flush(&self) -> Result<(), FlushError> {
623 let (tx, rx) = tokio::sync::oneshot::channel();
624 self.sender
625 .send(Command::Flush { observer: tx })
626 .await
627 .map_err(|err| FlushError::with_source(FlushErrorKind::SendError, err))?;
628
629 rx.await
630 .map_err(|err| FlushError::with_source(FlushErrorKind::FlushError, err))?;
631 Ok(())
632 }
633
634 /// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
635 /// messages, then closes the connection. Once completed, any associated streams associated with the
636 /// client will be closed, and further client commands will fail
637 ///
638 /// # Examples
639 ///
640 /// ```no_run
641 /// # #[tokio::main]
642 /// # async fn main() -> Result<(), async_nats::Error> {
643 /// use futures::StreamExt;
644 /// let client = async_nats::connect("demo.nats.io").await?;
645 /// let mut subscription = client.subscribe("events.>").await?;
646 ///
647 /// client.drain().await?;
648 ///
649 /// # // existing subscriptions are closed and further commands will fail
650 /// assert!(subscription.next().await.is_none());
651 /// client
652 /// .subscribe("events.>")
653 /// .await
654 /// .expect_err("Expected further commands to fail");
655 ///
656 /// # Ok(())
657 /// # }
658 /// ```
659 pub async fn drain(&self) -> Result<(), DrainError> {
660 // Drain all subscriptions
661 self.sender.send(Command::Drain { sid: None }).await?;
662
663 // Remaining process is handled on the handler-side
664 Ok(())
665 }
666
667 /// Returns the current state of the connection.
668 ///
669 /// # Examples
670 /// ```no_run
671 /// # #[tokio::main]
672 /// # async fn main() -> Result<(), async_nats::Error> {
673 /// let client = async_nats::connect("demo.nats.io").await?;
674 /// println!("connection state: {}", client.connection_state());
675 /// # Ok(())
676 /// # }
677 /// ```
678 pub fn connection_state(&self) -> State {
679 self.state.borrow().to_owned()
680 }
681
682 /// Forces the client to reconnect.
683 /// Keep in mind that client will reconnect automatically if the connection is lost and this
684 /// method does not have to be used in normal circumstances.
685 /// However, if you want to force the client to reconnect, for example to re-trigger
686 /// the `auth-callback`, or manually rebalance connections, this method can be useful.
687 /// This method does not wait for connection to be re-established.
688 ///
689 /// # Examples
690 /// ```no_run
691 /// # #[tokio::main]
692 /// # async fn main() -> Result<(), async_nats::Error> {
693 /// let client = async_nats::connect("demo.nats.io").await?;
694 /// client.force_reconnect().await?;
695 /// # Ok(())
696 /// # }
697 /// ```
698 pub async fn force_reconnect(&self) -> Result<(), ReconnectError> {
699 self.sender
700 .send(Command::Reconnect)
701 .await
702 .map_err(Into::into)
703 }
704
705 /// Returns struct representing statistics of the whole lifecycle of the client.
706 /// This includes number of bytes sent/received, number of messages sent/received,
707 /// and number of times the connection was established.
708 /// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
709 /// across threads.
710 ///
711 /// # Examples
712 /// ```no_run
713 /// # #[tokio::main]
714 /// # async fn main() -> Result<(), async_nats::Error> {
715 /// let client = async_nats::connect("demo.nats.io").await?;
716 /// let statistics = client.statistics();
717 /// println!("client statistics: {:#?}", statistics);
718 /// # Ok(())
719 /// # }
720 /// ```
721 pub fn statistics(&self) -> Arc<Statistics> {
722 self.connection_stats.clone()
723 }
724}
725
726/// Used for building customized requests.
727#[derive(Default)]
728pub struct Request {
729 payload: Option<Bytes>,
730 headers: Option<HeaderMap>,
731 timeout: Option<Option<Duration>>,
732 inbox: Option<String>,
733}
734
735impl Request {
736 pub fn new() -> Request {
737 Default::default()
738 }
739
740 /// Sets the payload of the request. If not used, empty payload will be sent.
741 ///
742 /// # Examples
743 /// ```no_run
744 /// # #[tokio::main]
745 /// # async fn main() -> Result<(), async_nats::Error> {
746 /// let client = async_nats::connect("demo.nats.io").await?;
747 /// let request = async_nats::Request::new().payload("data".into());
748 /// client.send_request("service", request).await?;
749 /// # Ok(())
750 /// # }
751 /// ```
752 pub fn payload(mut self, payload: Bytes) -> Request {
753 self.payload = Some(payload);
754 self
755 }
756
757 /// Sets the headers of the requests.
758 ///
759 /// # Examples
760 /// ```no_run
761 /// # #[tokio::main]
762 /// # async fn main() -> Result<(), async_nats::Error> {
763 /// use std::str::FromStr;
764 /// let client = async_nats::connect("demo.nats.io").await?;
765 /// let mut headers = async_nats::HeaderMap::new();
766 /// headers.insert(
767 /// "X-Example",
768 /// async_nats::HeaderValue::from_str("Value").unwrap(),
769 /// );
770 /// let request = async_nats::Request::new()
771 /// .headers(headers)
772 /// .payload("data".into());
773 /// client.send_request("service", request).await?;
774 /// # Ok(())
775 /// # }
776 /// ```
777 pub fn headers(mut self, headers: HeaderMap) -> Request {
778 self.headers = Some(headers);
779 self
780 }
781
782 /// Sets the custom timeout of the request. Overrides default [Client] timeout.
783 /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
784 /// To use default timeout, simply do not call this function.
785 ///
786 /// # Examples
787 /// ```no_run
788 /// # #[tokio::main]
789 /// # async fn main() -> Result<(), async_nats::Error> {
790 /// let client = async_nats::connect("demo.nats.io").await?;
791 /// let request = async_nats::Request::new()
792 /// .timeout(Some(std::time::Duration::from_secs(15)))
793 /// .payload("data".into());
794 /// client.send_request("service", request).await?;
795 /// # Ok(())
796 /// # }
797 /// ```
798 pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
799 self.timeout = Some(timeout);
800 self
801 }
802
803 /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
804 ///
805 /// # Examples
806 /// ```no_run
807 /// # #[tokio::main]
808 /// # async fn main() -> Result<(), async_nats::Error> {
809 /// use std::str::FromStr;
810 /// let client = async_nats::connect("demo.nats.io").await?;
811 /// let request = async_nats::Request::new()
812 /// .inbox("custom_inbox".into())
813 /// .payload("data".into());
814 /// client.send_request("service", request).await?;
815 /// # Ok(())
816 /// # }
817 /// ```
818 pub fn inbox(mut self, inbox: String) -> Request {
819 self.inbox = Some(inbox);
820 self
821 }
822}
823
824#[derive(Error, Debug)]
825#[error("failed to send reconnect: {0}")]
826pub struct ReconnectError(#[source] crate::Error);
827
828impl From<tokio::sync::mpsc::error::SendError<Command>> for ReconnectError {
829 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
830 ReconnectError(Box::new(err))
831 }
832}
833
834#[derive(Error, Debug)]
835#[error("failed to send subscribe: {0}")]
836pub struct SubscribeError(#[source] crate::Error);
837
838impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
839 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
840 SubscribeError(Box::new(err))
841 }
842}
843
844#[derive(Error, Debug)]
845#[error("failed to send drain: {0}")]
846pub struct DrainError(#[source] crate::Error);
847
848impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
849 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
850 DrainError(Box::new(err))
851 }
852}
853
854#[derive(Clone, Copy, Debug, PartialEq)]
855pub enum RequestErrorKind {
856 /// There are services listening on requested subject, but they didn't respond
857 /// in time.
858 TimedOut,
859 /// No one is listening on request subject.
860 NoResponders,
861 /// Other errors, client/io related.
862 Other,
863}
864
865impl Display for RequestErrorKind {
866 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
867 match self {
868 Self::TimedOut => write!(f, "request timed out"),
869 Self::NoResponders => write!(f, "no responders"),
870 Self::Other => write!(f, "request failed"),
871 }
872 }
873}
874
875/// Error returned when a core NATS request fails.
876/// To be enumerate over the variants, call [RequestError::kind].
877pub type RequestError = Error<RequestErrorKind>;
878
879impl From<PublishError> for RequestError {
880 fn from(e: PublishError) -> Self {
881 RequestError::with_source(RequestErrorKind::Other, e)
882 }
883}
884
885impl From<SubscribeError> for RequestError {
886 fn from(e: SubscribeError) -> Self {
887 RequestError::with_source(RequestErrorKind::Other, e)
888 }
889}
890
891#[derive(Clone, Copy, Debug, PartialEq)]
892pub enum FlushErrorKind {
893 /// Sending the flush failed client side.
894 SendError,
895 /// Flush failed.
896 /// This can happen mostly in case of connection issues
897 /// that cannot be resolved quickly.
898 FlushError,
899}
900
901impl Display for FlushErrorKind {
902 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
903 match self {
904 Self::SendError => write!(f, "failed to send flush request"),
905 Self::FlushError => write!(f, "flush failed"),
906 }
907 }
908}
909
910pub type FlushError = Error<FlushErrorKind>;
911
912/// Represents statistics for the instance of the client throughout its lifecycle.
913#[derive(Default, Debug)]
914pub struct Statistics {
915 /// Number of bytes received. This does not include the protocol overhead.
916 pub in_bytes: AtomicU64,
917 /// Number of bytes sent. This doe not include the protocol overhead.
918 pub out_bytes: AtomicU64,
919 /// Number of messages received.
920 pub in_messages: AtomicU64,
921 /// Number of messages sent.
922 pub out_messages: AtomicU64,
923 /// Number of times connection was established.
924 /// Initial connect will be counted as well, then all successful reconnects.
925 pub connects: AtomicU64,
926}