async_nats/
options.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use crate::auth::Auth;
15use crate::connector;
16use crate::{Client, ConnectError, Event, ToServerAddrs};
17use base64::engine::general_purpose::URL_SAFE_NO_PAD;
18use base64::engine::Engine;
19use futures::Future;
20use std::fmt::Formatter;
21use std::{
22    fmt,
23    path::{Path, PathBuf},
24    pin::Pin,
25    sync::Arc,
26    time::Duration,
27};
28use tokio::io;
29use tokio_rustls::rustls;
30
31/// Connect options. Used to connect with NATS when custom config is needed.
32/// # Examples
33/// ```no_run
34/// # #[tokio::main]
35/// # async fn main() -> Result<(), async_nats::ConnectError> {
36/// let mut options = async_nats::ConnectOptions::new()
37///     .require_tls(true)
38///     .ping_interval(std::time::Duration::from_secs(10))
39///     .connect("demo.nats.io")
40///     .await?;
41/// # Ok(())
42/// # }
43/// ```
44pub struct ConnectOptions {
45    pub(crate) name: Option<String>,
46    pub(crate) no_echo: bool,
47    pub(crate) max_reconnects: Option<usize>,
48    pub(crate) connection_timeout: Duration,
49    pub(crate) auth: Auth,
50    pub(crate) tls_required: bool,
51    pub(crate) tls_first: bool,
52    pub(crate) certificates: Vec<PathBuf>,
53    pub(crate) client_cert: Option<PathBuf>,
54    pub(crate) client_key: Option<PathBuf>,
55    pub(crate) tls_client_config: Option<rustls::ClientConfig>,
56    pub(crate) ping_interval: Duration,
57    pub(crate) subscription_capacity: usize,
58    pub(crate) sender_capacity: usize,
59    pub(crate) event_callback: Option<CallbackArg1<Event, ()>>,
60    pub(crate) inbox_prefix: String,
61    pub(crate) request_timeout: Option<Duration>,
62    pub(crate) retry_on_initial_connect: bool,
63    pub(crate) ignore_discovered_servers: bool,
64    pub(crate) retain_servers_order: bool,
65    pub(crate) read_buffer_capacity: u16,
66    pub(crate) reconnect_delay_callback: Box<dyn Fn(usize) -> Duration + Send + Sync + 'static>,
67    pub(crate) auth_callback: Option<CallbackArg1<Vec<u8>, Result<Auth, AuthError>>>,
68}
69
70impl fmt::Debug for ConnectOptions {
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
72        f.debug_map()
73            .entry(&"name", &self.name)
74            .entry(&"no_echo", &self.no_echo)
75            .entry(&"max_reconnects", &self.max_reconnects)
76            .entry(&"connection_timeout", &self.connection_timeout)
77            .entry(&"tls_required", &self.tls_required)
78            .entry(&"certificates", &self.certificates)
79            .entry(&"client_cert", &self.client_cert)
80            .entry(&"client_key", &self.client_key)
81            .entry(&"tls_client_config", &"XXXXXXXX")
82            .entry(&"tls_first", &self.tls_first)
83            .entry(&"ping_interval", &self.ping_interval)
84            .entry(&"sender_capacity", &self.sender_capacity)
85            .entry(&"inbox_prefix", &self.inbox_prefix)
86            .entry(&"retry_on_initial_connect", &self.retry_on_initial_connect)
87            .entry(&"read_buffer_capacity", &self.read_buffer_capacity)
88            .finish()
89    }
90}
91
92impl Default for ConnectOptions {
93    fn default() -> ConnectOptions {
94        ConnectOptions {
95            name: None,
96            no_echo: false,
97            max_reconnects: None,
98            connection_timeout: Duration::from_secs(5),
99            tls_required: false,
100            tls_first: false,
101            certificates: Vec::new(),
102            client_cert: None,
103            client_key: None,
104            tls_client_config: None,
105            ping_interval: Duration::from_secs(60),
106            sender_capacity: 2048,
107            subscription_capacity: 1024 * 64,
108            event_callback: None,
109            inbox_prefix: "_INBOX".to_string(),
110            request_timeout: Some(Duration::from_secs(10)),
111            retry_on_initial_connect: false,
112            ignore_discovered_servers: false,
113            retain_servers_order: false,
114            read_buffer_capacity: 65535,
115            reconnect_delay_callback: Box::new(|attempts| {
116                connector::reconnect_delay_callback_default(attempts)
117            }),
118            auth: Default::default(),
119            auth_callback: None,
120        }
121    }
122}
123
124impl ConnectOptions {
125    /// Enables customization of NATS connection.
126    ///
127    /// # Examples
128    /// ```no_run
129    /// # #[tokio::main]
130    /// # async fn main() -> Result<(), async_nats::ConnectError> {
131    /// let mut options = async_nats::ConnectOptions::new()
132    ///     .require_tls(true)
133    ///     .ping_interval(std::time::Duration::from_secs(10))
134    ///     .connect("demo.nats.io")
135    ///     .await?;
136    /// # Ok(())
137    /// # }
138    /// ```
139    pub fn new() -> ConnectOptions {
140        ConnectOptions::default()
141    }
142
143    /// Connect to the NATS Server leveraging all passed options.
144    ///
145    /// # Examples
146    /// ```no_run
147    /// # #[tokio::main]
148    /// # async fn main() -> Result<(), async_nats::ConnectError> {
149    /// let nc = async_nats::ConnectOptions::new()
150    ///     .require_tls(true)
151    ///     .connect("demo.nats.io")
152    ///     .await?;
153    /// # Ok(())
154    /// # }
155    /// ```
156    ///
157    /// ## Pass multiple URLs.
158    /// ```no_run
159    /// #[tokio::main]
160    /// # async fn main() -> Result<(), async_nats::Error> {
161    /// use async_nats::ServerAddr;
162    /// let client = async_nats::connect(vec![
163    ///     "demo.nats.io".parse::<ServerAddr>()?,
164    ///     "other.nats.io".parse::<ServerAddr>()?,
165    /// ])
166    /// .await
167    /// .unwrap();
168    /// # Ok(())
169    /// # }
170    /// ```
171    pub async fn connect<A: ToServerAddrs>(self, addrs: A) -> Result<Client, ConnectError> {
172        crate::connect_with_options(addrs, self).await
173    }
174
175    /// Creates a builder with a custom auth callback to be used when authenticating against the NATS Server.
176    /// Requires an asynchronous function that accepts nonce and returns [Auth].
177    /// It will overwrite all other auth methods used.
178    ///
179    ///
180    /// # Example
181    /// ```no_run
182    /// # #[tokio::main]
183    /// # async fn main() -> Result<(), async_nats::ConnectError> {
184    /// async_nats::ConnectOptions::with_auth_callback(move |_| async move {
185    ///     let mut auth = async_nats::Auth::new();
186    ///     auth.username = Some("derek".to_string());
187    ///     auth.password = Some("s3cr3t".to_string());
188    ///     Ok(auth)
189    /// })
190    /// .connect("demo.nats.io")
191    /// .await?;
192    /// # Ok(())
193    /// # }
194    /// ```
195    pub fn with_auth_callback<F, Fut>(callback: F) -> Self
196    where
197        F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
198        Fut: Future<Output = std::result::Result<Auth, AuthError>> + 'static + Send + Sync,
199    {
200        let mut options = ConnectOptions::new();
201        options.auth_callback = Some(CallbackArg1::<Vec<u8>, Result<Auth, AuthError>>(Box::new(
202            move |nonce| Box::pin(callback(nonce)),
203        )));
204        options
205    }
206
207    /// Authenticate against NATS Server with the provided token.
208    ///
209    /// # Examples
210    /// ```no_run
211    /// # #[tokio::main]
212    /// # async fn main() -> Result<(), async_nats::ConnectError> {
213    /// let nc = async_nats::ConnectOptions::with_token("t0k3n!".into())
214    ///     .connect("demo.nats.io")
215    ///     .await?;
216    /// # Ok(())
217    /// # }
218    /// ```
219    pub fn with_token(token: String) -> Self {
220        ConnectOptions::default().token(token)
221    }
222
223    /// Use a builder to specify a token, to be used when authenticating against the NATS Server.
224    /// This can be used as a way to mix authentication methods.
225    ///
226    /// # Examples
227    /// ```no_run
228    /// # #[tokio::main]
229    /// # async fn main() -> Result<(), async_nats::ConnectError> {
230    /// let nc = async_nats::ConnectOptions::new()
231    ///     .token("t0k3n!".into())
232    ///     .connect("demo.nats.io")
233    ///     .await?;
234    /// # Ok(())
235    /// # }
236    /// ```
237    pub fn token(mut self, token: String) -> Self {
238        self.auth.token = Some(token);
239        self
240    }
241
242    /// Authenticate against NATS Server with the provided username and password.
243    ///
244    /// # Examples
245    /// ```no_run
246    /// # #[tokio::main]
247    /// # async fn main() -> Result<(), async_nats::ConnectError> {
248    /// let nc = async_nats::ConnectOptions::with_user_and_password("derek".into(), "s3cr3t!".into())
249    ///     .connect("demo.nats.io")
250    ///     .await?;
251    /// # Ok(())
252    /// # }
253    /// ```
254    pub fn with_user_and_password(user: String, pass: String) -> Self {
255        ConnectOptions::default().user_and_password(user, pass)
256    }
257
258    /// Use a builder to specify a username and password, to be used when authenticating against the NATS Server.
259    /// This can be used as a way to mix authentication methods.
260    ///
261    /// # Examples
262    /// ```no_run
263    /// # #[tokio::main]
264    /// # async fn main() -> Result<(), async_nats::ConnectError> {
265    /// let nc = async_nats::ConnectOptions::new()
266    ///     .user_and_password("derek".into(), "s3cr3t!".into())
267    ///     .connect("demo.nats.io")
268    ///     .await?;
269    /// # Ok(())
270    /// # }
271    /// ```
272    pub fn user_and_password(mut self, user: String, pass: String) -> Self {
273        self.auth.username = Some(user);
274        self.auth.password = Some(pass);
275        self
276    }
277
278    /// Authenticate with an NKey. Requires an NKey Seed secret.
279    ///
280    /// # Example
281    /// ```no_run
282    /// # #[tokio::main]
283    /// # async fn main() -> Result<(), async_nats::ConnectError> {
284    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
285    /// let nc = async_nats::ConnectOptions::with_nkey(seed.into())
286    ///     .connect("localhost")
287    ///     .await?;
288    /// # Ok(())
289    /// # }
290    /// ```
291    pub fn with_nkey(seed: String) -> Self {
292        ConnectOptions::default().nkey(seed)
293    }
294
295    /// Use a builder to specify an NKey, to be used when authenticating against the NATS Server.
296    /// Requires an NKey Seed Secret.
297    /// This can be used as a way to mix authentication methods.
298    ///
299    /// # Example
300    /// ```no_run
301    /// # #[tokio::main]
302    /// # async fn main() -> Result<(), async_nats::ConnectError> {
303    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
304    /// let nc = async_nats::ConnectOptions::new()
305    ///     .nkey(seed.into())
306    ///     .connect("localhost")
307    ///     .await?;
308    /// # Ok(())
309    /// # }
310    /// ```
311    pub fn nkey(mut self, seed: String) -> Self {
312        self.auth.nkey = Some(seed);
313        self
314    }
315
316    /// Authenticate with a JWT. Requires function to sign the server nonce.
317    /// The signing function is asynchronous.
318    ///
319    /// # Example
320    /// ```no_run
321    /// # #[tokio::main]
322    /// # async fn main() -> Result<(), async_nats::ConnectError> {
323    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
324    /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
325    /// // load jwt from creds file or other secure source
326    /// async fn load_jwt() -> std::io::Result<String> {
327    ///     todo!();
328    /// }
329    /// let jwt = load_jwt().await?;
330    /// let nc = async_nats::ConnectOptions::with_jwt(jwt, move |nonce| {
331    ///     let key_pair = key_pair.clone();
332    ///     async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
333    /// })
334    /// .connect("localhost")
335    /// .await?;
336    /// # Ok(())
337    /// # }
338    /// ```
339    pub fn with_jwt<F, Fut>(jwt: String, sign_cb: F) -> Self
340    where
341        F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
342        Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
343    {
344        ConnectOptions::default().jwt(jwt, sign_cb)
345    }
346
347    /// Use a builder to specify a JWT, to be used when authenticating against the NATS Server.
348    /// Requires an asynchronous function to sign the server nonce.
349    /// This can be used as a way to mix authentication methods.
350    ///
351    ///
352    /// # Example
353    /// ```no_run
354    /// # #[tokio::main]
355    /// # async fn main() -> Result<(), async_nats::ConnectError> {
356    /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
357    /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
358    /// // load jwt from creds file or other secure source
359    /// async fn load_jwt() -> std::io::Result<String> {
360    ///     todo!();
361    /// }
362    /// let jwt = load_jwt().await?;
363    /// let nc = async_nats::ConnectOptions::new()
364    ///     .jwt(jwt, move |nonce| {
365    ///         let key_pair = key_pair.clone();
366    ///         async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
367    ///     })
368    ///     .connect("localhost")
369    ///     .await?;
370    /// # Ok(())
371    /// # }
372    /// ```
373    pub fn jwt<F, Fut>(mut self, jwt: String, sign_cb: F) -> Self
374    where
375        F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
376        Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
377    {
378        let sign_cb = Arc::new(sign_cb);
379
380        let jwt_sign_callback = CallbackArg1(Box::new(move |nonce: String| {
381            let sign_cb = sign_cb.clone();
382            Box::pin(async move {
383                let sig = sign_cb(nonce.as_bytes().to_vec())
384                    .await
385                    .map_err(AuthError::new)?;
386                Ok(URL_SAFE_NO_PAD.encode(sig))
387            })
388        }));
389
390        self.auth.jwt = Some(jwt);
391        self.auth.signature_callback = Some(jwt_sign_callback);
392        self
393    }
394
395    /// Authenticate with NATS using a `.creds` file.
396    /// Open the provided file, load its creds,
397    /// and perform the desired authentication
398    ///
399    /// # Example
400    /// ```no_run
401    /// # #[tokio::main]
402    /// # async fn main() -> Result<(), async_nats::ConnectError> {
403    /// let nc = async_nats::ConnectOptions::with_credentials_file("path/to/my.creds")
404    ///     .await?
405    ///     .connect("connect.ngs.global")
406    ///     .await?;
407    /// # Ok(())
408    /// # }
409    /// ```
410    pub async fn with_credentials_file(path: impl AsRef<Path>) -> io::Result<Self> {
411        let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
412        Self::with_credentials(&cred_file_contents)
413    }
414
415    /// Use a builder to specify a credentials file, to be used when authenticating against the NATS Server.
416    /// This will open the credentials file and load its credentials.
417    /// This can be used as a way to mix authentication methods.
418    ///
419    /// # Example
420    /// ```no_run
421    /// # #[tokio::main]
422    /// # async fn main() -> Result<(), async_nats::ConnectError> {
423    /// let nc = async_nats::ConnectOptions::new()
424    ///     .credentials_file("path/to/my.creds")
425    ///     .await?
426    ///     .connect("connect.ngs.global")
427    ///     .await?;
428    /// # Ok(())
429    /// # }
430    /// ```
431    pub async fn credentials_file(self, path: impl AsRef<Path>) -> io::Result<Self> {
432        let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
433        self.credentials(&cred_file_contents)
434    }
435
436    /// Authenticate with NATS using a credential str, in the creds file format.
437    ///
438    /// # Example
439    /// ```no_run
440    /// # #[tokio::main]
441    /// # async fn main() -> Result<(), async_nats::ConnectError> {
442    /// let creds = "-----BEGIN NATS USER JWT-----
443    /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
444    /// ------END NATS USER JWT------
445    ///
446    /// ************************* IMPORTANT *************************
447    /// NKEY Seed printed below can be used sign and prove identity.
448    /// NKEYs are sensitive and should be treated as secrets.
449    ///
450    /// -----BEGIN USER NKEY SEED-----
451    /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
452    /// ------END USER NKEY SEED------
453    /// ";
454    ///
455    /// let nc = async_nats::ConnectOptions::with_credentials(creds)
456    ///     .expect("failed to parse static creds")
457    ///     .connect("connect.ngs.global")
458    ///     .await?;
459    /// # Ok(())
460    /// # }
461    /// ```
462    pub fn with_credentials(creds: &str) -> io::Result<Self> {
463        ConnectOptions::default().credentials(creds)
464    }
465
466    /// Use a builder to specify a credentials string, to be used when authenticating against the NATS Server.
467    /// The string should be in the credentials file format.
468    /// This can be used as a way to mix authentication methods.
469    ///
470    /// # Example
471    /// ```no_run
472    /// # #[tokio::main]
473    /// # async fn main() -> Result<(), async_nats::ConnectError> {
474    /// let creds = "-----BEGIN NATS USER JWT-----
475    /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
476    /// ------END NATS USER JWT------
477    ///
478    /// ************************* IMPORTANT *************************
479    /// NKEY Seed printed below can be used sign and prove identity.
480    /// NKEYs are sensitive and should be treated as secrets.
481    ///
482    /// -----BEGIN USER NKEY SEED-----
483    /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
484    /// ------END USER NKEY SEED------
485    /// ";
486    ///
487    /// let nc = async_nats::ConnectOptions::new()
488    ///     .credentials(creds)
489    ///     .expect("failed to parse static creds")
490    ///     .connect("connect.ngs.global")
491    ///     .await?;
492    /// # Ok(())
493    /// # }
494    /// ```
495    pub fn credentials(self, creds: &str) -> io::Result<Self> {
496        let (jwt, key_pair) = crate::auth_utils::parse_jwt_and_key_from_creds(creds)?;
497        let key_pair = std::sync::Arc::new(key_pair);
498
499        Ok(self.jwt(jwt.to_owned(), move |nonce| {
500            let key_pair = key_pair.clone();
501            async move { key_pair.sign(&nonce).map_err(AuthError::new) }
502        }))
503    }
504
505    /// Loads root certificates by providing the path to them.
506    ///
507    /// # Examples
508    /// ```no_run
509    /// # #[tokio::main]
510    /// # async fn main() -> Result<(), async_nats::ConnectError> {
511    /// let nc = async_nats::ConnectOptions::new()
512    ///     .add_root_certificates("mycerts.pem".into())
513    ///     .connect("demo.nats.io")
514    ///     .await?;
515    /// # Ok(())
516    /// # }
517    /// ```
518    pub fn add_root_certificates(mut self, path: PathBuf) -> ConnectOptions {
519        self.certificates = vec![path];
520        self
521    }
522
523    /// Loads client certificate by providing the path to it.
524    ///
525    /// # Examples
526    /// ```no_run
527    /// # #[tokio::main]
528    /// # async fn main() -> Result<(), async_nats::ConnectError> {
529    /// let nc = async_nats::ConnectOptions::new()
530    ///     .add_client_certificate("cert.pem".into(), "key.pem".into())
531    ///     .connect("demo.nats.io")
532    ///     .await?;
533    /// # Ok(())
534    /// # }
535    /// ```
536    pub fn add_client_certificate(mut self, cert: PathBuf, key: PathBuf) -> ConnectOptions {
537        self.client_cert = Some(cert);
538        self.client_key = Some(key);
539        self
540    }
541
542    /// Sets or disables TLS requirement. If TLS connection is impossible while `options.require_tls(true)` connection will return error.
543    ///
544    /// # Examples
545    /// ```no_run
546    /// # #[tokio::main]
547    /// # async fn main() -> Result<(), async_nats::ConnectError> {
548    /// let nc = async_nats::ConnectOptions::new()
549    ///     .require_tls(true)
550    ///     .connect("demo.nats.io")
551    ///     .await?;
552    /// # Ok(())
553    /// # }
554    /// ```
555    pub fn require_tls(mut self, is_required: bool) -> ConnectOptions {
556        self.tls_required = is_required;
557        self
558    }
559
560    /// Changes how tls connection is established. If `tls_first` is set,
561    /// client will try to establish tls before getting info from the server.
562    /// That requires the server to enable `handshake_first` option in the config.
563    pub fn tls_first(mut self) -> ConnectOptions {
564        self.tls_first = true;
565        self.tls_required = true;
566        self
567    }
568
569    /// Sets how often Client sends PING message to the server.
570    ///
571    /// # Examples
572    /// ```no_run
573    /// # use tokio::time::Duration;
574    /// # #[tokio::main]
575    /// # async fn main() -> Result<(), async_nats::ConnectError> {
576    /// async_nats::ConnectOptions::new()
577    ///     .ping_interval(Duration::from_secs(24))
578    ///     .connect("demo.nats.io")
579    ///     .await?;
580    /// # Ok(())
581    /// # }
582    /// ```
583    pub fn ping_interval(mut self, ping_interval: Duration) -> ConnectOptions {
584        self.ping_interval = ping_interval;
585        self
586    }
587
588    /// Sets `no_echo` option which disables delivering messages that were published from the same
589    /// connection.
590    ///
591    /// # Examples
592    /// ```no_run
593    /// # #[tokio::main]
594    /// # async fn main() -> Result<(), async_nats::ConnectError> {
595    /// async_nats::ConnectOptions::new()
596    ///     .no_echo()
597    ///     .connect("demo.nats.io")
598    ///     .await?;
599    /// # Ok(())
600    /// # }
601    /// ```
602    pub fn no_echo(mut self) -> ConnectOptions {
603        self.no_echo = true;
604        self
605    }
606
607    /// Sets the capacity for `Subscribers`. Exceeding it will trigger `slow consumer` error
608    /// callback and drop messages.
609    /// Default is set to 65536 messages buffer.
610    ///
611    /// # Examples
612    /// ```no_run
613    /// # #[tokio::main]
614    /// # async fn main() -> Result<(), async_nats::ConnectError> {
615    /// async_nats::ConnectOptions::new()
616    ///     .subscription_capacity(1024)
617    ///     .connect("demo.nats.io")
618    ///     .await?;
619    /// # Ok(())
620    /// # }
621    /// ```
622    pub fn subscription_capacity(mut self, capacity: usize) -> ConnectOptions {
623        self.subscription_capacity = capacity;
624        self
625    }
626
627    /// Sets a timeout for the underlying TcpStream connection to avoid hangs and deadlocks.
628    /// Default is set to 5 seconds.
629    ///
630    /// # Examples
631    /// ```no_run
632    /// # #[tokio::main]
633    /// # async fn main() -> Result<(), async_nats::ConnectError> {
634    /// async_nats::ConnectOptions::new()
635    ///     .connection_timeout(tokio::time::Duration::from_secs(5))
636    ///     .connect("demo.nats.io")
637    ///     .await?;
638    /// # Ok(())
639    /// # }
640    /// ```
641    pub fn connection_timeout(mut self, timeout: Duration) -> ConnectOptions {
642        self.connection_timeout = timeout;
643        self
644    }
645
646    /// Sets a timeout for `Client::request`. Default value is set to 10 seconds.
647    ///
648    /// # Examples
649    /// ```no_run
650    /// # #[tokio::main]
651    /// # async fn main() -> Result<(), async_nats::ConnectError> {
652    /// async_nats::ConnectOptions::new()
653    ///     .request_timeout(Some(std::time::Duration::from_secs(3)))
654    ///     .connect("demo.nats.io")
655    ///     .await?;
656    /// # Ok(())
657    /// # }
658    /// ```
659    pub fn request_timeout(mut self, timeout: Option<Duration>) -> ConnectOptions {
660        self.request_timeout = timeout;
661        self
662    }
663
664    /// Registers an asynchronous callback for errors that are received over the wire from the server.
665    ///
666    /// # Examples
667    /// As asynchronous callbacks are still not in `stable` channel, here are some examples how to
668    /// work around this
669    ///
670    /// ## Basic
671    /// If you don't need to move anything into the closure, simple signature can be used:
672    ///
673    /// ```no_run
674    /// # #[tokio::main]
675    /// # async fn main() -> Result<(), async_nats::ConnectError> {
676    /// async_nats::ConnectOptions::new()
677    ///     .event_callback(|event| async move {
678    ///         println!("event occurred: {}", event);
679    ///     })
680    ///     .connect("demo.nats.io")
681    ///     .await?;
682    /// # Ok(())
683    /// # }
684    /// ```
685    ///
686    /// ## Listening to specific event kind
687    /// ```no_run
688    /// # #[tokio::main]
689    /// # async fn main() -> Result<(), async_nats::ConnectError> {
690    /// async_nats::ConnectOptions::new()
691    ///     .event_callback(|event| async move {
692    ///         match event {
693    ///             async_nats::Event::Disconnected => println!("disconnected"),
694    ///             async_nats::Event::Connected => println!("reconnected"),
695    ///             async_nats::Event::ClientError(err) => println!("client error occurred: {}", err),
696    ///             other => println!("other event happened: {}", other),
697    ///         }
698    ///     })
699    ///     .connect("demo.nats.io")
700    ///     .await?;
701    /// # Ok(())
702    /// # }
703    /// ```
704    ///
705    /// ## Advanced
706    /// If you need to move something into the closure, here's an example how to do that
707    ///
708    /// ```no_run
709    /// # #[tokio::main]
710    /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
711    /// let (tx, mut _rx) = tokio::sync::mpsc::channel(1);
712    /// async_nats::ConnectOptions::new()
713    ///     .event_callback(move |event| {
714    ///         let tx = tx.clone();
715    ///         async move {
716    ///             tx.send(event).await.unwrap();
717    ///         }
718    ///     })
719    ///     .connect("demo.nats.io")
720    ///     .await?;
721    /// # Ok(())
722    /// # }
723    /// ```
724    pub fn event_callback<F, Fut>(mut self, cb: F) -> ConnectOptions
725    where
726        F: Fn(Event) -> Fut + Send + Sync + 'static,
727        Fut: Future<Output = ()> + 'static + Send + Sync,
728    {
729        self.event_callback = Some(CallbackArg1::<Event, ()>(Box::new(move |event| {
730            Box::pin(cb(event))
731        })));
732        self
733    }
734
735    /// Registers a callback for a custom reconnect delay handler that can be used to define a backoff duration strategy.
736    ///
737    /// # Examples
738    /// ```no_run
739    /// # #[tokio::main]
740    /// # async fn main() -> Result<(), async_nats::ConnectError> {
741    /// async_nats::ConnectOptions::new()
742    ///     .reconnect_delay_callback(|attempts| {
743    ///         println!("no of attempts: {attempts}");
744    ///         std::time::Duration::from_millis(std::cmp::min((attempts * 100) as u64, 8000))
745    ///     })
746    ///     .connect("demo.nats.io")
747    ///     .await?;
748    /// # Ok(())
749    /// # }
750    /// ```
751    pub fn reconnect_delay_callback<F>(mut self, cb: F) -> ConnectOptions
752    where
753        F: Fn(usize) -> Duration + Send + Sync + 'static,
754    {
755        self.reconnect_delay_callback = Box::new(cb);
756        self
757    }
758
759    /// By default, Client dispatches op's to the Client onto the channel with capacity of 128.
760    /// This option enables overriding it.
761    ///
762    /// # Examples
763    /// ```
764    /// # #[tokio::main]
765    /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
766    /// async_nats::ConnectOptions::new()
767    ///     .client_capacity(256)
768    ///     .connect("demo.nats.io")
769    ///     .await?;
770    /// # Ok(())
771    /// # }
772    /// ```
773    pub fn client_capacity(mut self, capacity: usize) -> ConnectOptions {
774        self.sender_capacity = capacity;
775        self
776    }
777
778    /// Sets custom prefix instead of default `_INBOX`.
779    ///
780    /// # Examples
781    ///
782    /// ```
783    /// # #[tokio::main]
784    /// # async fn main() -> Result<(), async_nats::Error> {
785    /// async_nats::ConnectOptions::new()
786    ///     .custom_inbox_prefix("CUSTOM")
787    ///     .connect("demo.nats.io")
788    ///     .await?;
789    /// # Ok(())
790    /// # }
791    /// ```
792    pub fn custom_inbox_prefix<T: ToString>(mut self, prefix: T) -> ConnectOptions {
793        self.inbox_prefix = prefix.to_string();
794        self
795    }
796
797    /// Sets the name for the client.
798    ///
799    /// # Examples
800    /// ```
801    /// # #[tokio::main]
802    /// # async fn main() -> Result<(), async_nats::Error> {
803    /// async_nats::ConnectOptions::new()
804    ///     .name("rust-service")
805    ///     .connect("demo.nats.io")
806    ///     .await?;
807    /// # Ok(())
808    /// # }
809    /// ```
810    pub fn name<T: ToString>(mut self, name: T) -> ConnectOptions {
811        self.name = Some(name.to_string());
812        self
813    }
814
815    /// By default, [`ConnectOptions::connect`] will return an error if
816    /// the connection to the server cannot be established.
817    ///
818    /// Setting `retry_on_initial_connect` makes the client
819    /// establish the connection in the background.
820    pub fn retry_on_initial_connect(mut self) -> ConnectOptions {
821        self.retry_on_initial_connect = true;
822        self
823    }
824
825    /// Specifies the number of consecutive reconnect attempts the client will
826    /// make before giving up. This is useful for preventing zombie services
827    /// from endlessly reaching the servers, but it can also be a footgun and
828    /// surprise for users who do not expect that the client can give up
829    /// entirely.
830    ///
831    /// Pass `None` or `0` for no limit.
832    ///
833    /// # Examples
834    /// ```
835    /// # #[tokio::main]
836    /// # async fn main() -> Result<(), async_nats::Error> {
837    /// async_nats::ConnectOptions::new()
838    ///     .max_reconnects(None)
839    ///     .connect("demo.nats.io")
840    ///     .await?;
841    /// # Ok(())
842    /// # }
843    /// ```
844    pub fn max_reconnects<T: Into<Option<usize>>>(mut self, max_reconnects: T) -> ConnectOptions {
845        let val: Option<usize> = max_reconnects.into();
846        self.max_reconnects = if val == Some(0) { None } else { val };
847        self
848    }
849
850    /// By default, a server may advertise other servers in the cluster known to it.
851    /// By setting this option, the client will ignore the advertised servers.
852    /// This may be useful if the client may not be able to reach them.
853    pub fn ignore_discovered_servers(mut self) -> ConnectOptions {
854        self.ignore_discovered_servers = true;
855        self
856    }
857
858    /// By default, client will pick random server to which it will try connect to.
859    /// This option disables that feature, forcing it to always respect the order
860    /// in which server addresses were passed.
861    pub fn retain_servers_order(mut self) -> ConnectOptions {
862        self.retain_servers_order = true;
863        self
864    }
865
866    /// Allows passing custom rustls tls config.
867    ///
868    /// # Examples
869    /// ```
870    /// # #[tokio::main]
871    /// # async fn main() -> Result<(), async_nats::Error> {
872    /// let mut root_store = async_nats::rustls::RootCertStore::empty();
873    ///
874    /// root_store.add_parsable_certificates(rustls_native_certs::load_native_certs()?);
875    ///
876    /// let tls_client = async_nats::rustls::ClientConfig::builder()
877    ///     .with_root_certificates(root_store)
878    ///     .with_no_client_auth();
879    ///
880    /// let client = async_nats::ConnectOptions::new()
881    ///     .require_tls(true)
882    ///     .tls_client_config(tls_client)
883    ///     .connect("tls://demo.nats.io")
884    ///     .await?;
885    ///
886    /// # Ok(())
887    /// # }
888    /// ```
889    pub fn tls_client_config(mut self, config: rustls::ClientConfig) -> ConnectOptions {
890        self.tls_client_config = Some(config);
891        self
892    }
893
894    /// Sets the initial capacity of the read buffer. Which is a buffer used to gather partial
895    /// protocol messages.
896    ///
897    /// # Examples
898    /// ```
899    /// # #[tokio::main]
900    /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
901    /// async_nats::ConnectOptions::new()
902    ///     .read_buffer_capacity(65535)
903    ///     .connect("demo.nats.io")
904    ///     .await?;
905    /// # Ok(())
906    /// # }
907    /// ```
908    pub fn read_buffer_capacity(mut self, size: u16) -> ConnectOptions {
909        self.read_buffer_capacity = size;
910        self
911    }
912}
913
914pub(crate) type AsyncCallbackArg1<A, T> =
915    Box<dyn Fn(A) -> Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>> + Send + Sync>;
916
917pub(crate) struct CallbackArg1<A, T>(AsyncCallbackArg1<A, T>);
918
919impl<A, T> CallbackArg1<A, T> {
920    pub(crate) async fn call(&self, arg: A) -> T {
921        (self.0.as_ref())(arg).await
922    }
923}
924
925impl<A, T> fmt::Debug for CallbackArg1<A, T> {
926    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
927        f.write_str("callback")
928    }
929}
930
931/// Error report from signing callback.
932// This was needed because std::io::Error isn't Send.
933#[derive(Clone, PartialEq)]
934pub struct AuthError(String);
935
936impl AuthError {
937    pub fn new(s: impl ToString) -> Self {
938        Self(s.to_string())
939    }
940}
941
942impl std::fmt::Display for AuthError {
943    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
944        f.write_str(&format!("AuthError({})", &self.0))
945    }
946}
947
948impl std::fmt::Debug for AuthError {
949    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
950        f.write_str(&format!("AuthError({})", &self.0))
951    }
952}
953
954impl std::error::Error for AuthError {}