async_nats/
client.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use core::pin::Pin;
15use core::task::{Context, Poll};
16
17use crate::connection::State;
18use crate::subject::ToSubject;
19use crate::{PublishMessage, ServerInfo};
20
21use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
22use crate::error::Error;
23use bytes::Bytes;
24use futures::future::TryFutureExt;
25use futures::{Sink, SinkExt as _, StreamExt};
26use once_cell::sync::Lazy;
27use portable_atomic::AtomicU64;
28use regex::Regex;
29use std::fmt::Display;
30use std::sync::atomic::{AtomicUsize, Ordering};
31use std::sync::Arc;
32use std::time::Duration;
33use thiserror::Error;
34use tokio::sync::{mpsc, oneshot};
35use tokio_util::sync::PollSender;
36use tracing::trace;
37
38static VERSION_RE: Lazy<Regex> =
39    Lazy::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
40
41/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
42/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
43pub type PublishError = Error<PublishErrorKind>;
44
45impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
46    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
47        PublishError::with_source(PublishErrorKind::Send, err)
48    }
49}
50
51impl From<tokio_util::sync::PollSendError<Command>> for PublishError {
52    fn from(err: tokio_util::sync::PollSendError<Command>) -> Self {
53        PublishError::with_source(PublishErrorKind::Send, err)
54    }
55}
56
57#[derive(Copy, Clone, Debug, PartialEq)]
58pub enum PublishErrorKind {
59    MaxPayloadExceeded,
60    Send,
61}
62
63impl Display for PublishErrorKind {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        match self {
66            PublishErrorKind::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
67            PublishErrorKind::Send => write!(f, "failed to send message"),
68        }
69    }
70}
71
72/// Client is a `Cloneable` handle to NATS connection.
73/// Client should not be created directly. Instead, one of two methods can be used:
74/// [crate::connect] and [crate::ConnectOptions::connect]
75#[derive(Clone, Debug)]
76pub struct Client {
77    info: tokio::sync::watch::Receiver<ServerInfo>,
78    pub(crate) state: tokio::sync::watch::Receiver<State>,
79    pub(crate) sender: mpsc::Sender<Command>,
80    poll_sender: PollSender<Command>,
81    next_subscription_id: Arc<AtomicU64>,
82    subscription_capacity: usize,
83    inbox_prefix: Arc<str>,
84    request_timeout: Option<Duration>,
85    max_payload: Arc<AtomicUsize>,
86    connection_stats: Arc<Statistics>,
87}
88
89impl Sink<PublishMessage> for Client {
90    type Error = PublishError;
91
92    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
93        self.poll_sender.poll_ready_unpin(cx).map_err(Into::into)
94    }
95
96    fn start_send(mut self: Pin<&mut Self>, msg: PublishMessage) -> Result<(), Self::Error> {
97        self.poll_sender
98            .start_send_unpin(Command::Publish(msg))
99            .map_err(Into::into)
100    }
101
102    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
103        self.poll_sender.poll_flush_unpin(cx).map_err(Into::into)
104    }
105
106    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
107        self.poll_sender.poll_close_unpin(cx).map_err(Into::into)
108    }
109}
110
111impl Client {
112    #[allow(clippy::too_many_arguments)]
113    pub(crate) fn new(
114        info: tokio::sync::watch::Receiver<ServerInfo>,
115        state: tokio::sync::watch::Receiver<State>,
116        sender: mpsc::Sender<Command>,
117        capacity: usize,
118        inbox_prefix: String,
119        request_timeout: Option<Duration>,
120        max_payload: Arc<AtomicUsize>,
121        statistics: Arc<Statistics>,
122    ) -> Client {
123        let poll_sender = PollSender::new(sender.clone());
124        Client {
125            info,
126            state,
127            sender,
128            poll_sender,
129            next_subscription_id: Arc::new(AtomicU64::new(1)),
130            subscription_capacity: capacity,
131            inbox_prefix: inbox_prefix.into(),
132            request_timeout,
133            max_payload,
134            connection_stats: statistics,
135        }
136    }
137
138    /// Returns the default timeout for requests set when creating the client.
139    ///
140    /// # Examples
141    /// ```no_run
142    /// # #[tokio::main]
143    /// # async fn main() -> Result<(), async_nats::Error> {
144    /// let client = async_nats::connect("demo.nats.io").await?;
145    /// println!("default request timeout: {:?}", client.timeout());
146    /// # Ok(())
147    /// # }
148    /// ```
149    pub fn timeout(&self) -> Option<Duration> {
150        self.request_timeout
151    }
152
153    /// Returns last received info from the server.
154    ///
155    /// # Examples
156    ///
157    /// ```no_run
158    /// # #[tokio::main]
159    /// # async fn main () -> Result<(), async_nats::Error> {
160    /// let client = async_nats::connect("demo.nats.io").await?;
161    /// println!("info: {:?}", client.server_info());
162    /// # Ok(())
163    /// # }
164    /// ```
165    pub fn server_info(&self) -> ServerInfo {
166        // We ignore notifying the watcher, as that requires mutable client reference.
167        self.info.borrow().to_owned()
168    }
169
170    /// Returns true if the server version is compatible with the version components.
171    ///
172    /// This has to be used with caution, as it is not guaranteed that the server
173    /// that client is connected to is the same version that the one that is
174    /// a JetStream meta/stream/consumer leader, especially across leafnodes.
175    ///
176    /// # Examples
177    ///
178    /// ```no_run
179    /// # #[tokio::main]
180    /// # async fn main() -> Result<(), async_nats::Error> {
181    /// let client = async_nats::connect("demo.nats.io").await?;
182    /// assert!(client.is_server_compatible(2, 8, 4));
183    /// # Ok(())
184    /// # }
185    /// ```
186    pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
187        let info = self.server_info();
188
189        let server_version_captures = match VERSION_RE.captures(&info.version) {
190            Some(captures) => captures,
191            None => return false,
192        };
193
194        let server_major = server_version_captures
195            .get(1)
196            .map(|m| m.as_str().parse::<i64>().unwrap())
197            .unwrap();
198
199        let server_minor = server_version_captures
200            .get(2)
201            .map(|m| m.as_str().parse::<i64>().unwrap())
202            .unwrap();
203
204        let server_patch = server_version_captures
205            .get(3)
206            .map(|m| m.as_str().parse::<i64>().unwrap())
207            .unwrap();
208
209        if server_major < major
210            || (server_major == major && server_minor < minor)
211            || (server_major == major && server_minor == minor && server_patch < patch)
212        {
213            return false;
214        }
215        true
216    }
217
218    /// Publish a [Message] to a given subject.
219    ///
220    /// # Examples
221    /// ```no_run
222    /// # #[tokio::main]
223    /// # async fn main() -> Result<(), async_nats::Error> {
224    /// let client = async_nats::connect("demo.nats.io").await?;
225    /// client.publish("events.data", "payload".into()).await?;
226    /// # Ok(())
227    /// # }
228    /// ```
229    pub async fn publish<S: ToSubject>(
230        &self,
231        subject: S,
232        payload: Bytes,
233    ) -> Result<(), PublishError> {
234        let subject = subject.to_subject();
235        let max_payload = self.max_payload.load(Ordering::Relaxed);
236        if payload.len() > max_payload {
237            return Err(PublishError::with_source(
238                PublishErrorKind::MaxPayloadExceeded,
239                format!(
240                    "Payload size limit of {} exceeded by message size of {}",
241                    payload.len(),
242                    max_payload
243                ),
244            ));
245        }
246
247        self.sender
248            .send(Command::Publish(PublishMessage {
249                subject,
250                payload,
251                reply: None,
252                headers: None,
253            }))
254            .await?;
255        Ok(())
256    }
257
258    /// Publish a [Message] with headers to a given subject.
259    ///
260    /// # Examples
261    /// ```
262    /// # #[tokio::main]
263    /// # async fn main() -> Result<(), async_nats::Error> {
264    /// use std::str::FromStr;
265    /// let client = async_nats::connect("demo.nats.io").await?;
266    /// let mut headers = async_nats::HeaderMap::new();
267    /// headers.insert(
268    ///     "X-Header",
269    ///     async_nats::HeaderValue::from_str("Value").unwrap(),
270    /// );
271    /// client
272    ///     .publish_with_headers("events.data", headers, "payload".into())
273    ///     .await?;
274    /// # Ok(())
275    /// # }
276    /// ```
277    pub async fn publish_with_headers<S: ToSubject>(
278        &self,
279        subject: S,
280        headers: HeaderMap,
281        payload: Bytes,
282    ) -> Result<(), PublishError> {
283        let subject = subject.to_subject();
284
285        self.sender
286            .send(Command::Publish(PublishMessage {
287                subject,
288                payload,
289                reply: None,
290                headers: Some(headers),
291            }))
292            .await?;
293        Ok(())
294    }
295
296    /// Publish a [Message] to a given subject, with specified response subject
297    /// to which the subscriber can respond.
298    /// This method does not await for the response.
299    ///
300    /// # Examples
301    ///
302    /// ```no_run
303    /// # #[tokio::main]
304    /// # async fn main() -> Result<(), async_nats::Error> {
305    /// let client = async_nats::connect("demo.nats.io").await?;
306    /// client
307    ///     .publish_with_reply("events.data", "reply_subject", "payload".into())
308    ///     .await?;
309    /// # Ok(())
310    /// # }
311    /// ```
312    pub async fn publish_with_reply<S: ToSubject, R: ToSubject>(
313        &self,
314        subject: S,
315        reply: R,
316        payload: Bytes,
317    ) -> Result<(), PublishError> {
318        let subject = subject.to_subject();
319        let reply = reply.to_subject();
320
321        self.sender
322            .send(Command::Publish(PublishMessage {
323                subject,
324                payload,
325                reply: Some(reply),
326                headers: None,
327            }))
328            .await?;
329        Ok(())
330    }
331
332    /// Publish a [Message] to a given subject with headers and specified response subject
333    /// to which the subscriber can respond.
334    /// This method does not await for the response.
335    ///
336    /// # Examples
337    ///
338    /// ```no_run
339    /// # #[tokio::main]
340    /// # async fn main() -> Result<(), async_nats::Error> {
341    /// use std::str::FromStr;
342    /// let client = async_nats::connect("demo.nats.io").await?;
343    /// let mut headers = async_nats::HeaderMap::new();
344    /// client
345    ///     .publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
346    ///     .await?;
347    /// # Ok(())
348    /// # }
349    /// ```
350    pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>(
351        &self,
352        subject: S,
353        reply: R,
354        headers: HeaderMap,
355        payload: Bytes,
356    ) -> Result<(), PublishError> {
357        let subject = subject.to_subject();
358        let reply = reply.to_subject();
359
360        self.sender
361            .send(Command::Publish(PublishMessage {
362                subject,
363                payload,
364                reply: Some(reply),
365                headers: Some(headers),
366            }))
367            .await?;
368        Ok(())
369    }
370
371    /// Sends the request with headers.
372    ///
373    /// # Examples
374    /// ```no_run
375    /// # #[tokio::main]
376    /// # async fn main() -> Result<(), async_nats::Error> {
377    /// let client = async_nats::connect("demo.nats.io").await?;
378    /// let response = client.request("service", "data".into()).await?;
379    /// # Ok(())
380    /// # }
381    /// ```
382    pub async fn request<S: ToSubject>(
383        &self,
384        subject: S,
385        payload: Bytes,
386    ) -> Result<Message, RequestError> {
387        let subject = subject.to_subject();
388
389        trace!(
390            "request sent to subject: {} ({})",
391            subject.as_ref(),
392            payload.len()
393        );
394        let request = Request::new().payload(payload);
395        self.send_request(subject, request).await
396    }
397
398    /// Sends the request with headers.
399    ///
400    /// # Examples
401    /// ```no_run
402    /// # #[tokio::main]
403    /// # async fn main() -> Result<(), async_nats::Error> {
404    /// let client = async_nats::connect("demo.nats.io").await?;
405    /// let mut headers = async_nats::HeaderMap::new();
406    /// headers.insert("Key", "Value");
407    /// let response = client
408    ///     .request_with_headers("service", headers, "data".into())
409    ///     .await?;
410    /// # Ok(())
411    /// # }
412    /// ```
413    pub async fn request_with_headers<S: ToSubject>(
414        &self,
415        subject: S,
416        headers: HeaderMap,
417        payload: Bytes,
418    ) -> Result<Message, RequestError> {
419        let subject = subject.to_subject();
420
421        let request = Request::new().headers(headers).payload(payload);
422        self.send_request(subject, request).await
423    }
424
425    /// Sends the request created by the [Request].
426    ///
427    /// # Examples
428    ///
429    /// ```no_run
430    /// # #[tokio::main]
431    /// # async fn main() -> Result<(), async_nats::Error> {
432    /// let client = async_nats::connect("demo.nats.io").await?;
433    /// let request = async_nats::Request::new().payload("data".into());
434    /// let response = client.send_request("service", request).await?;
435    /// # Ok(())
436    /// # }
437    /// ```
438    pub async fn send_request<S: ToSubject>(
439        &self,
440        subject: S,
441        request: Request,
442    ) -> Result<Message, RequestError> {
443        let subject = subject.to_subject();
444
445        if let Some(inbox) = request.inbox {
446            let timeout = request.timeout.unwrap_or(self.request_timeout);
447            let mut subscriber = self.subscribe(inbox.clone()).await?;
448            let payload: Bytes = request.payload.unwrap_or_default();
449            match request.headers {
450                Some(headers) => {
451                    self.publish_with_reply_and_headers(subject, inbox, headers, payload)
452                        .await?
453                }
454                None => self.publish_with_reply(subject, inbox, payload).await?,
455            }
456            let request = match timeout {
457                Some(timeout) => {
458                    tokio::time::timeout(timeout, subscriber.next())
459                        .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
460                        .await?
461                }
462                None => subscriber.next().await,
463            };
464            match request {
465                Some(message) => {
466                    if message.status == Some(StatusCode::NO_RESPONDERS) {
467                        return Err(RequestError::with_source(
468                            RequestErrorKind::NoResponders,
469                            "no responders",
470                        ));
471                    }
472                    Ok(message)
473                }
474                None => Err(RequestError::with_source(
475                    RequestErrorKind::Other,
476                    "broken pipe",
477                )),
478            }
479        } else {
480            let (sender, receiver) = oneshot::channel();
481
482            let payload = request.payload.unwrap_or_default();
483            let respond = self.new_inbox().into();
484            let headers = request.headers;
485
486            self.sender
487                .send(Command::Request {
488                    subject,
489                    payload,
490                    respond,
491                    headers,
492                    sender,
493                })
494                .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
495                .await?;
496
497            let timeout = request.timeout.unwrap_or(self.request_timeout);
498            let request = match timeout {
499                Some(timeout) => {
500                    tokio::time::timeout(timeout, receiver)
501                        .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
502                        .await?
503                }
504                None => receiver.await,
505            };
506
507            match request {
508                Ok(message) => {
509                    if message.status == Some(StatusCode::NO_RESPONDERS) {
510                        return Err(RequestError::with_source(
511                            RequestErrorKind::NoResponders,
512                            "no responders",
513                        ));
514                    }
515                    Ok(message)
516                }
517                Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
518            }
519        }
520    }
521
522    /// Create a new globally unique inbox which can be used for replies.
523    ///
524    /// # Examples
525    ///
526    /// ```no_run
527    /// # #[tokio::main]
528    /// # async fn main() -> Result<(), async_nats::Error> {
529    /// # let mut nc = async_nats::connect("demo.nats.io").await?;
530    /// let reply = nc.new_inbox();
531    /// let rsub = nc.subscribe(reply).await?;
532    /// # Ok(())
533    /// # }
534    /// ```
535    pub fn new_inbox(&self) -> String {
536        format!("{}.{}", self.inbox_prefix, nuid::next())
537    }
538
539    /// Subscribes to a subject to receive [messages][Message].
540    ///
541    /// # Examples
542    ///
543    /// ```no_run
544    /// # #[tokio::main]
545    /// # async fn main() -> Result<(), async_nats::Error> {
546    /// use futures::StreamExt;
547    /// let client = async_nats::connect("demo.nats.io").await?;
548    /// let mut subscription = client.subscribe("events.>").await?;
549    /// while let Some(message) = subscription.next().await {
550    ///     println!("received message: {:?}", message);
551    /// }
552    /// # Ok(())
553    /// # }
554    /// ```
555    pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
556        let subject = subject.to_subject();
557        let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
558        let (sender, receiver) = mpsc::channel(self.subscription_capacity);
559
560        self.sender
561            .send(Command::Subscribe {
562                sid,
563                subject,
564                queue_group: None,
565                sender,
566            })
567            .await?;
568
569        Ok(Subscriber::new(sid, self.sender.clone(), receiver))
570    }
571
572    /// Subscribes to a subject with a queue group to receive [messages][Message].
573    ///
574    /// # Examples
575    ///
576    /// ```no_run
577    /// # #[tokio::main]
578    /// # async fn main() -> Result<(), async_nats::Error> {
579    /// use futures::StreamExt;
580    /// let client = async_nats::connect("demo.nats.io").await?;
581    /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
582    /// while let Some(message) = subscription.next().await {
583    ///     println!("received message: {:?}", message);
584    /// }
585    /// # Ok(())
586    /// # }
587    /// ```
588    pub async fn queue_subscribe<S: ToSubject>(
589        &self,
590        subject: S,
591        queue_group: String,
592    ) -> Result<Subscriber, SubscribeError> {
593        let subject = subject.to_subject();
594
595        let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
596        let (sender, receiver) = mpsc::channel(self.subscription_capacity);
597
598        self.sender
599            .send(Command::Subscribe {
600                sid,
601                subject,
602                queue_group: Some(queue_group),
603                sender,
604            })
605            .await?;
606
607        Ok(Subscriber::new(sid, self.sender.clone(), receiver))
608    }
609
610    /// Flushes the internal buffer ensuring that all messages are sent.
611    ///
612    /// # Examples
613    ///
614    /// ```no_run
615    /// # #[tokio::main]
616    /// # async fn main() -> Result<(), async_nats::Error> {
617    /// let client = async_nats::connect("demo.nats.io").await?;
618    /// client.flush().await?;
619    /// # Ok(())
620    /// # }
621    /// ```
622    pub async fn flush(&self) -> Result<(), FlushError> {
623        let (tx, rx) = tokio::sync::oneshot::channel();
624        self.sender
625            .send(Command::Flush { observer: tx })
626            .await
627            .map_err(|err| FlushError::with_source(FlushErrorKind::SendError, err))?;
628
629        rx.await
630            .map_err(|err| FlushError::with_source(FlushErrorKind::FlushError, err))?;
631        Ok(())
632    }
633
634    /// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
635    /// messages, then closes the connection. Once completed, any associated streams associated with the
636    /// client will be closed, and further client commands will fail
637    ///
638    /// # Examples
639    ///
640    /// ```no_run
641    /// # #[tokio::main]
642    /// # async fn main() -> Result<(), async_nats::Error> {
643    /// use futures::StreamExt;
644    /// let client = async_nats::connect("demo.nats.io").await?;
645    /// let mut subscription = client.subscribe("events.>").await?;
646    ///
647    /// client.drain().await?;
648    ///
649    /// # // existing subscriptions are closed and further commands will fail
650    /// assert!(subscription.next().await.is_none());
651    /// client
652    ///     .subscribe("events.>")
653    ///     .await
654    ///     .expect_err("Expected further commands to fail");
655    ///
656    /// # Ok(())
657    /// # }
658    /// ```
659    pub async fn drain(&self) -> Result<(), DrainError> {
660        // Drain all subscriptions
661        self.sender.send(Command::Drain { sid: None }).await?;
662
663        // Remaining process is handled on the handler-side
664        Ok(())
665    }
666
667    /// Returns the current state of the connection.
668    ///
669    /// # Examples
670    /// ```no_run
671    /// # #[tokio::main]
672    /// # async fn main() -> Result<(), async_nats::Error> {
673    /// let client = async_nats::connect("demo.nats.io").await?;
674    /// println!("connection state: {}", client.connection_state());
675    /// # Ok(())
676    /// # }
677    /// ```
678    pub fn connection_state(&self) -> State {
679        self.state.borrow().to_owned()
680    }
681
682    /// Forces the client to reconnect.
683    /// Keep in mind that client will reconnect automatically if the connection is lost and this
684    /// method does not have to be used in normal circumstances.
685    /// However, if you want to force the client to reconnect, for example to re-trigger
686    /// the `auth-callback`, or manually rebalance connections, this method can be useful.
687    /// This method does not wait for connection to be re-established.
688    ///
689    /// # Examples
690    /// ```no_run
691    /// # #[tokio::main]
692    /// # async fn main() -> Result<(), async_nats::Error> {
693    /// let client = async_nats::connect("demo.nats.io").await?;
694    /// client.force_reconnect().await?;
695    /// # Ok(())
696    /// # }
697    /// ```
698    pub async fn force_reconnect(&self) -> Result<(), ReconnectError> {
699        self.sender
700            .send(Command::Reconnect)
701            .await
702            .map_err(Into::into)
703    }
704
705    /// Returns struct representing statistics of the whole lifecycle of the client.
706    /// This includes number of bytes sent/received, number of messages sent/received,
707    /// and number of times the connection was established.
708    /// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
709    /// across threads.
710    ///
711    /// # Examples
712    /// ```no_run
713    /// # #[tokio::main]
714    /// # async fn main() -> Result<(), async_nats::Error> {
715    /// let client = async_nats::connect("demo.nats.io").await?;
716    /// let statistics = client.statistics();
717    /// println!("client statistics: {:#?}", statistics);
718    /// # Ok(())
719    /// # }
720    /// ```
721    pub fn statistics(&self) -> Arc<Statistics> {
722        self.connection_stats.clone()
723    }
724}
725
726/// Used for building customized requests.
727#[derive(Default)]
728pub struct Request {
729    payload: Option<Bytes>,
730    headers: Option<HeaderMap>,
731    timeout: Option<Option<Duration>>,
732    inbox: Option<String>,
733}
734
735impl Request {
736    pub fn new() -> Request {
737        Default::default()
738    }
739
740    /// Sets the payload of the request. If not used, empty payload will be sent.
741    ///
742    /// # Examples
743    /// ```no_run
744    /// # #[tokio::main]
745    /// # async fn main() -> Result<(), async_nats::Error> {
746    /// let client = async_nats::connect("demo.nats.io").await?;
747    /// let request = async_nats::Request::new().payload("data".into());
748    /// client.send_request("service", request).await?;
749    /// # Ok(())
750    /// # }
751    /// ```
752    pub fn payload(mut self, payload: Bytes) -> Request {
753        self.payload = Some(payload);
754        self
755    }
756
757    /// Sets the headers of the requests.
758    ///
759    /// # Examples
760    /// ```no_run
761    /// # #[tokio::main]
762    /// # async fn main() -> Result<(), async_nats::Error> {
763    /// use std::str::FromStr;
764    /// let client = async_nats::connect("demo.nats.io").await?;
765    /// let mut headers = async_nats::HeaderMap::new();
766    /// headers.insert(
767    ///     "X-Example",
768    ///     async_nats::HeaderValue::from_str("Value").unwrap(),
769    /// );
770    /// let request = async_nats::Request::new()
771    ///     .headers(headers)
772    ///     .payload("data".into());
773    /// client.send_request("service", request).await?;
774    /// # Ok(())
775    /// # }
776    /// ```
777    pub fn headers(mut self, headers: HeaderMap) -> Request {
778        self.headers = Some(headers);
779        self
780    }
781
782    /// Sets the custom timeout of the request. Overrides default [Client] timeout.
783    /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
784    /// To use default timeout, simply do not call this function.
785    ///
786    /// # Examples
787    /// ```no_run
788    /// # #[tokio::main]
789    /// # async fn main() -> Result<(), async_nats::Error> {
790    /// let client = async_nats::connect("demo.nats.io").await?;
791    /// let request = async_nats::Request::new()
792    ///     .timeout(Some(std::time::Duration::from_secs(15)))
793    ///     .payload("data".into());
794    /// client.send_request("service", request).await?;
795    /// # Ok(())
796    /// # }
797    /// ```
798    pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
799        self.timeout = Some(timeout);
800        self
801    }
802
803    /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
804    ///
805    /// # Examples
806    /// ```no_run
807    /// # #[tokio::main]
808    /// # async fn main() -> Result<(), async_nats::Error> {
809    /// use std::str::FromStr;
810    /// let client = async_nats::connect("demo.nats.io").await?;
811    /// let request = async_nats::Request::new()
812    ///     .inbox("custom_inbox".into())
813    ///     .payload("data".into());
814    /// client.send_request("service", request).await?;
815    /// # Ok(())
816    /// # }
817    /// ```
818    pub fn inbox(mut self, inbox: String) -> Request {
819        self.inbox = Some(inbox);
820        self
821    }
822}
823
824#[derive(Error, Debug)]
825#[error("failed to send reconnect: {0}")]
826pub struct ReconnectError(#[source] crate::Error);
827
828impl From<tokio::sync::mpsc::error::SendError<Command>> for ReconnectError {
829    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
830        ReconnectError(Box::new(err))
831    }
832}
833
834#[derive(Error, Debug)]
835#[error("failed to send subscribe: {0}")]
836pub struct SubscribeError(#[source] crate::Error);
837
838impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
839    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
840        SubscribeError(Box::new(err))
841    }
842}
843
844#[derive(Error, Debug)]
845#[error("failed to send drain: {0}")]
846pub struct DrainError(#[source] crate::Error);
847
848impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
849    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
850        DrainError(Box::new(err))
851    }
852}
853
854#[derive(Clone, Copy, Debug, PartialEq)]
855pub enum RequestErrorKind {
856    /// There are services listening on requested subject, but they didn't respond
857    /// in time.
858    TimedOut,
859    /// No one is listening on request subject.
860    NoResponders,
861    /// Other errors, client/io related.
862    Other,
863}
864
865impl Display for RequestErrorKind {
866    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
867        match self {
868            Self::TimedOut => write!(f, "request timed out"),
869            Self::NoResponders => write!(f, "no responders"),
870            Self::Other => write!(f, "request failed"),
871        }
872    }
873}
874
875/// Error returned when a core NATS request fails.
876/// To be enumerate over the variants, call [RequestError::kind].
877pub type RequestError = Error<RequestErrorKind>;
878
879impl From<PublishError> for RequestError {
880    fn from(e: PublishError) -> Self {
881        RequestError::with_source(RequestErrorKind::Other, e)
882    }
883}
884
885impl From<SubscribeError> for RequestError {
886    fn from(e: SubscribeError) -> Self {
887        RequestError::with_source(RequestErrorKind::Other, e)
888    }
889}
890
891#[derive(Clone, Copy, Debug, PartialEq)]
892pub enum FlushErrorKind {
893    /// Sending the flush failed client side.
894    SendError,
895    /// Flush failed.
896    /// This can happen mostly in case of connection issues
897    /// that cannot be resolved quickly.
898    FlushError,
899}
900
901impl Display for FlushErrorKind {
902    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
903        match self {
904            Self::SendError => write!(f, "failed to send flush request"),
905            Self::FlushError => write!(f, "flush failed"),
906        }
907    }
908}
909
910pub type FlushError = Error<FlushErrorKind>;
911
912/// Represents statistics for the instance of the client throughout its lifecycle.
913#[derive(Default, Debug)]
914pub struct Statistics {
915    /// Number of bytes received. This does not include the protocol overhead.
916    pub in_bytes: AtomicU64,
917    /// Number of bytes sent. This doe not include the protocol overhead.
918    pub out_bytes: AtomicU64,
919    /// Number of messages received.
920    pub in_messages: AtomicU64,
921    /// Number of messages sent.
922    pub out_messages: AtomicU64,
923    /// Number of times connection was established.
924    /// Initial connect will be counted as well, then all successful reconnects.
925    pub connects: AtomicU64,
926}