async_nats/options.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use crate::auth::Auth;
15use crate::connector;
16use crate::{Client, ConnectError, Event, ToServerAddrs};
17use base64::engine::general_purpose::URL_SAFE_NO_PAD;
18use base64::engine::Engine;
19use futures::Future;
20use std::fmt::Formatter;
21use std::{
22 fmt,
23 path::{Path, PathBuf},
24 pin::Pin,
25 sync::Arc,
26 time::Duration,
27};
28use tokio::io;
29use tokio_rustls::rustls;
30
31/// Connect options. Used to connect with NATS when custom config is needed.
32/// # Examples
33/// ```no_run
34/// # #[tokio::main]
35/// # async fn main() -> Result<(), async_nats::ConnectError> {
36/// let mut options = async_nats::ConnectOptions::new()
37/// .require_tls(true)
38/// .ping_interval(std::time::Duration::from_secs(10))
39/// .connect("demo.nats.io")
40/// .await?;
41/// # Ok(())
42/// # }
43/// ```
44pub struct ConnectOptions {
45 pub(crate) name: Option<String>,
46 pub(crate) no_echo: bool,
47 pub(crate) max_reconnects: Option<usize>,
48 pub(crate) connection_timeout: Duration,
49 pub(crate) auth: Auth,
50 pub(crate) tls_required: bool,
51 pub(crate) tls_first: bool,
52 pub(crate) certificates: Vec<PathBuf>,
53 pub(crate) client_cert: Option<PathBuf>,
54 pub(crate) client_key: Option<PathBuf>,
55 pub(crate) tls_client_config: Option<rustls::ClientConfig>,
56 pub(crate) ping_interval: Duration,
57 pub(crate) subscription_capacity: usize,
58 pub(crate) sender_capacity: usize,
59 pub(crate) event_callback: Option<CallbackArg1<Event, ()>>,
60 pub(crate) inbox_prefix: String,
61 pub(crate) request_timeout: Option<Duration>,
62 pub(crate) retry_on_initial_connect: bool,
63 pub(crate) ignore_discovered_servers: bool,
64 pub(crate) retain_servers_order: bool,
65 pub(crate) read_buffer_capacity: u16,
66 pub(crate) reconnect_delay_callback: Box<dyn Fn(usize) -> Duration + Send + Sync + 'static>,
67 pub(crate) auth_callback: Option<CallbackArg1<Vec<u8>, Result<Auth, AuthError>>>,
68}
69
70impl fmt::Debug for ConnectOptions {
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
72 f.debug_map()
73 .entry(&"name", &self.name)
74 .entry(&"no_echo", &self.no_echo)
75 .entry(&"max_reconnects", &self.max_reconnects)
76 .entry(&"connection_timeout", &self.connection_timeout)
77 .entry(&"tls_required", &self.tls_required)
78 .entry(&"certificates", &self.certificates)
79 .entry(&"client_cert", &self.client_cert)
80 .entry(&"client_key", &self.client_key)
81 .entry(&"tls_client_config", &"XXXXXXXX")
82 .entry(&"tls_first", &self.tls_first)
83 .entry(&"ping_interval", &self.ping_interval)
84 .entry(&"sender_capacity", &self.sender_capacity)
85 .entry(&"inbox_prefix", &self.inbox_prefix)
86 .entry(&"retry_on_initial_connect", &self.retry_on_initial_connect)
87 .entry(&"read_buffer_capacity", &self.read_buffer_capacity)
88 .finish()
89 }
90}
91
92impl Default for ConnectOptions {
93 fn default() -> ConnectOptions {
94 ConnectOptions {
95 name: None,
96 no_echo: false,
97 max_reconnects: None,
98 connection_timeout: Duration::from_secs(5),
99 tls_required: false,
100 tls_first: false,
101 certificates: Vec::new(),
102 client_cert: None,
103 client_key: None,
104 tls_client_config: None,
105 ping_interval: Duration::from_secs(60),
106 sender_capacity: 2048,
107 subscription_capacity: 1024 * 64,
108 event_callback: None,
109 inbox_prefix: "_INBOX".to_string(),
110 request_timeout: Some(Duration::from_secs(10)),
111 retry_on_initial_connect: false,
112 ignore_discovered_servers: false,
113 retain_servers_order: false,
114 read_buffer_capacity: 65535,
115 reconnect_delay_callback: Box::new(|attempts| {
116 connector::reconnect_delay_callback_default(attempts)
117 }),
118 auth: Default::default(),
119 auth_callback: None,
120 }
121 }
122}
123
124impl ConnectOptions {
125 /// Enables customization of NATS connection.
126 ///
127 /// # Examples
128 /// ```no_run
129 /// # #[tokio::main]
130 /// # async fn main() -> Result<(), async_nats::ConnectError> {
131 /// let mut options = async_nats::ConnectOptions::new()
132 /// .require_tls(true)
133 /// .ping_interval(std::time::Duration::from_secs(10))
134 /// .connect("demo.nats.io")
135 /// .await?;
136 /// # Ok(())
137 /// # }
138 /// ```
139 pub fn new() -> ConnectOptions {
140 ConnectOptions::default()
141 }
142
143 /// Connect to the NATS Server leveraging all passed options.
144 ///
145 /// # Examples
146 /// ```no_run
147 /// # #[tokio::main]
148 /// # async fn main() -> Result<(), async_nats::ConnectError> {
149 /// let nc = async_nats::ConnectOptions::new()
150 /// .require_tls(true)
151 /// .connect("demo.nats.io")
152 /// .await?;
153 /// # Ok(())
154 /// # }
155 /// ```
156 ///
157 /// ## Pass multiple URLs.
158 /// ```no_run
159 /// #[tokio::main]
160 /// # async fn main() -> Result<(), async_nats::Error> {
161 /// use async_nats::ServerAddr;
162 /// let client = async_nats::connect(vec![
163 /// "demo.nats.io".parse::<ServerAddr>()?,
164 /// "other.nats.io".parse::<ServerAddr>()?,
165 /// ])
166 /// .await
167 /// .unwrap();
168 /// # Ok(())
169 /// # }
170 /// ```
171 pub async fn connect<A: ToServerAddrs>(self, addrs: A) -> Result<Client, ConnectError> {
172 crate::connect_with_options(addrs, self).await
173 }
174
175 /// Creates a builder with a custom auth callback to be used when authenticating against the NATS Server.
176 /// Requires an asynchronous function that accepts nonce and returns [Auth].
177 /// It will overwrite all other auth methods used.
178 ///
179 ///
180 /// # Example
181 /// ```no_run
182 /// # #[tokio::main]
183 /// # async fn main() -> Result<(), async_nats::ConnectError> {
184 /// async_nats::ConnectOptions::with_auth_callback(move |_| async move {
185 /// let mut auth = async_nats::Auth::new();
186 /// auth.username = Some("derek".to_string());
187 /// auth.password = Some("s3cr3t".to_string());
188 /// Ok(auth)
189 /// })
190 /// .connect("demo.nats.io")
191 /// .await?;
192 /// # Ok(())
193 /// # }
194 /// ```
195 pub fn with_auth_callback<F, Fut>(callback: F) -> Self
196 where
197 F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
198 Fut: Future<Output = std::result::Result<Auth, AuthError>> + 'static + Send + Sync,
199 {
200 let mut options = ConnectOptions::new();
201 options.auth_callback = Some(CallbackArg1::<Vec<u8>, Result<Auth, AuthError>>(Box::new(
202 move |nonce| Box::pin(callback(nonce)),
203 )));
204 options
205 }
206
207 /// Authenticate against NATS Server with the provided token.
208 ///
209 /// # Examples
210 /// ```no_run
211 /// # #[tokio::main]
212 /// # async fn main() -> Result<(), async_nats::ConnectError> {
213 /// let nc = async_nats::ConnectOptions::with_token("t0k3n!".into())
214 /// .connect("demo.nats.io")
215 /// .await?;
216 /// # Ok(())
217 /// # }
218 /// ```
219 pub fn with_token(token: String) -> Self {
220 ConnectOptions::default().token(token)
221 }
222
223 /// Use a builder to specify a token, to be used when authenticating against the NATS Server.
224 /// This can be used as a way to mix authentication methods.
225 ///
226 /// # Examples
227 /// ```no_run
228 /// # #[tokio::main]
229 /// # async fn main() -> Result<(), async_nats::ConnectError> {
230 /// let nc = async_nats::ConnectOptions::new()
231 /// .token("t0k3n!".into())
232 /// .connect("demo.nats.io")
233 /// .await?;
234 /// # Ok(())
235 /// # }
236 /// ```
237 pub fn token(mut self, token: String) -> Self {
238 self.auth.token = Some(token);
239 self
240 }
241
242 /// Authenticate against NATS Server with the provided username and password.
243 ///
244 /// # Examples
245 /// ```no_run
246 /// # #[tokio::main]
247 /// # async fn main() -> Result<(), async_nats::ConnectError> {
248 /// let nc = async_nats::ConnectOptions::with_user_and_password("derek".into(), "s3cr3t!".into())
249 /// .connect("demo.nats.io")
250 /// .await?;
251 /// # Ok(())
252 /// # }
253 /// ```
254 pub fn with_user_and_password(user: String, pass: String) -> Self {
255 ConnectOptions::default().user_and_password(user, pass)
256 }
257
258 /// Use a builder to specify a username and password, to be used when authenticating against the NATS Server.
259 /// This can be used as a way to mix authentication methods.
260 ///
261 /// # Examples
262 /// ```no_run
263 /// # #[tokio::main]
264 /// # async fn main() -> Result<(), async_nats::ConnectError> {
265 /// let nc = async_nats::ConnectOptions::new()
266 /// .user_and_password("derek".into(), "s3cr3t!".into())
267 /// .connect("demo.nats.io")
268 /// .await?;
269 /// # Ok(())
270 /// # }
271 /// ```
272 pub fn user_and_password(mut self, user: String, pass: String) -> Self {
273 self.auth.username = Some(user);
274 self.auth.password = Some(pass);
275 self
276 }
277
278 /// Authenticate with an NKey. Requires an NKey Seed secret.
279 ///
280 /// # Example
281 /// ```no_run
282 /// # #[tokio::main]
283 /// # async fn main() -> Result<(), async_nats::ConnectError> {
284 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
285 /// let nc = async_nats::ConnectOptions::with_nkey(seed.into())
286 /// .connect("localhost")
287 /// .await?;
288 /// # Ok(())
289 /// # }
290 /// ```
291 pub fn with_nkey(seed: String) -> Self {
292 ConnectOptions::default().nkey(seed)
293 }
294
295 /// Use a builder to specify an NKey, to be used when authenticating against the NATS Server.
296 /// Requires an NKey Seed Secret.
297 /// This can be used as a way to mix authentication methods.
298 ///
299 /// # Example
300 /// ```no_run
301 /// # #[tokio::main]
302 /// # async fn main() -> Result<(), async_nats::ConnectError> {
303 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
304 /// let nc = async_nats::ConnectOptions::new()
305 /// .nkey(seed.into())
306 /// .connect("localhost")
307 /// .await?;
308 /// # Ok(())
309 /// # }
310 /// ```
311 pub fn nkey(mut self, seed: String) -> Self {
312 self.auth.nkey = Some(seed);
313 self
314 }
315
316 /// Authenticate with a JWT. Requires function to sign the server nonce.
317 /// The signing function is asynchronous.
318 ///
319 /// # Example
320 /// ```no_run
321 /// # #[tokio::main]
322 /// # async fn main() -> Result<(), async_nats::ConnectError> {
323 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
324 /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
325 /// // load jwt from creds file or other secure source
326 /// async fn load_jwt() -> std::io::Result<String> {
327 /// todo!();
328 /// }
329 /// let jwt = load_jwt().await?;
330 /// let nc = async_nats::ConnectOptions::with_jwt(jwt, move |nonce| {
331 /// let key_pair = key_pair.clone();
332 /// async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
333 /// })
334 /// .connect("localhost")
335 /// .await?;
336 /// # Ok(())
337 /// # }
338 /// ```
339 pub fn with_jwt<F, Fut>(jwt: String, sign_cb: F) -> Self
340 where
341 F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
342 Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
343 {
344 ConnectOptions::default().jwt(jwt, sign_cb)
345 }
346
347 /// Use a builder to specify a JWT, to be used when authenticating against the NATS Server.
348 /// Requires an asynchronous function to sign the server nonce.
349 /// This can be used as a way to mix authentication methods.
350 ///
351 ///
352 /// # Example
353 /// ```no_run
354 /// # #[tokio::main]
355 /// # async fn main() -> Result<(), async_nats::ConnectError> {
356 /// let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
357 /// let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
358 /// // load jwt from creds file or other secure source
359 /// async fn load_jwt() -> std::io::Result<String> {
360 /// todo!();
361 /// }
362 /// let jwt = load_jwt().await?;
363 /// let nc = async_nats::ConnectOptions::new()
364 /// .jwt(jwt, move |nonce| {
365 /// let key_pair = key_pair.clone();
366 /// async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
367 /// })
368 /// .connect("localhost")
369 /// .await?;
370 /// # Ok(())
371 /// # }
372 /// ```
373 pub fn jwt<F, Fut>(mut self, jwt: String, sign_cb: F) -> Self
374 where
375 F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
376 Fut: Future<Output = std::result::Result<Vec<u8>, AuthError>> + 'static + Send + Sync,
377 {
378 let sign_cb = Arc::new(sign_cb);
379
380 let jwt_sign_callback = CallbackArg1(Box::new(move |nonce: String| {
381 let sign_cb = sign_cb.clone();
382 Box::pin(async move {
383 let sig = sign_cb(nonce.as_bytes().to_vec())
384 .await
385 .map_err(AuthError::new)?;
386 Ok(URL_SAFE_NO_PAD.encode(sig))
387 })
388 }));
389
390 self.auth.jwt = Some(jwt);
391 self.auth.signature_callback = Some(jwt_sign_callback);
392 self
393 }
394
395 /// Authenticate with NATS using a `.creds` file.
396 /// Open the provided file, load its creds,
397 /// and perform the desired authentication
398 ///
399 /// # Example
400 /// ```no_run
401 /// # #[tokio::main]
402 /// # async fn main() -> Result<(), async_nats::ConnectError> {
403 /// let nc = async_nats::ConnectOptions::with_credentials_file("path/to/my.creds")
404 /// .await?
405 /// .connect("connect.ngs.global")
406 /// .await?;
407 /// # Ok(())
408 /// # }
409 /// ```
410 pub async fn with_credentials_file(path: impl AsRef<Path>) -> io::Result<Self> {
411 let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
412 Self::with_credentials(&cred_file_contents)
413 }
414
415 /// Use a builder to specify a credentials file, to be used when authenticating against the NATS Server.
416 /// This will open the credentials file and load its credentials.
417 /// This can be used as a way to mix authentication methods.
418 ///
419 /// # Example
420 /// ```no_run
421 /// # #[tokio::main]
422 /// # async fn main() -> Result<(), async_nats::ConnectError> {
423 /// let nc = async_nats::ConnectOptions::new()
424 /// .credentials_file("path/to/my.creds")
425 /// .await?
426 /// .connect("connect.ngs.global")
427 /// .await?;
428 /// # Ok(())
429 /// # }
430 /// ```
431 pub async fn credentials_file(self, path: impl AsRef<Path>) -> io::Result<Self> {
432 let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?;
433 self.credentials(&cred_file_contents)
434 }
435
436 /// Authenticate with NATS using a credential str, in the creds file format.
437 ///
438 /// # Example
439 /// ```no_run
440 /// # #[tokio::main]
441 /// # async fn main() -> Result<(), async_nats::ConnectError> {
442 /// let creds = "-----BEGIN NATS USER JWT-----
443 /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
444 /// ------END NATS USER JWT------
445 ///
446 /// ************************* IMPORTANT *************************
447 /// NKEY Seed printed below can be used sign and prove identity.
448 /// NKEYs are sensitive and should be treated as secrets.
449 ///
450 /// -----BEGIN USER NKEY SEED-----
451 /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
452 /// ------END USER NKEY SEED------
453 /// ";
454 ///
455 /// let nc = async_nats::ConnectOptions::with_credentials(creds)
456 /// .expect("failed to parse static creds")
457 /// .connect("connect.ngs.global")
458 /// .await?;
459 /// # Ok(())
460 /// # }
461 /// ```
462 pub fn with_credentials(creds: &str) -> io::Result<Self> {
463 ConnectOptions::default().credentials(creds)
464 }
465
466 /// Use a builder to specify a credentials string, to be used when authenticating against the NATS Server.
467 /// The string should be in the credentials file format.
468 /// This can be used as a way to mix authentication methods.
469 ///
470 /// # Example
471 /// ```no_run
472 /// # #[tokio::main]
473 /// # async fn main() -> Result<(), async_nats::ConnectError> {
474 /// let creds = "-----BEGIN NATS USER JWT-----
475 /// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
476 /// ------END NATS USER JWT------
477 ///
478 /// ************************* IMPORTANT *************************
479 /// NKEY Seed printed below can be used sign and prove identity.
480 /// NKEYs are sensitive and should be treated as secrets.
481 ///
482 /// -----BEGIN USER NKEY SEED-----
483 /// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
484 /// ------END USER NKEY SEED------
485 /// ";
486 ///
487 /// let nc = async_nats::ConnectOptions::new()
488 /// .credentials(creds)
489 /// .expect("failed to parse static creds")
490 /// .connect("connect.ngs.global")
491 /// .await?;
492 /// # Ok(())
493 /// # }
494 /// ```
495 pub fn credentials(self, creds: &str) -> io::Result<Self> {
496 let (jwt, key_pair) = crate::auth_utils::parse_jwt_and_key_from_creds(creds)?;
497 let key_pair = std::sync::Arc::new(key_pair);
498
499 Ok(self.jwt(jwt.to_owned(), move |nonce| {
500 let key_pair = key_pair.clone();
501 async move { key_pair.sign(&nonce).map_err(AuthError::new) }
502 }))
503 }
504
505 /// Loads root certificates by providing the path to them.
506 ///
507 /// # Examples
508 /// ```no_run
509 /// # #[tokio::main]
510 /// # async fn main() -> Result<(), async_nats::ConnectError> {
511 /// let nc = async_nats::ConnectOptions::new()
512 /// .add_root_certificates("mycerts.pem".into())
513 /// .connect("demo.nats.io")
514 /// .await?;
515 /// # Ok(())
516 /// # }
517 /// ```
518 pub fn add_root_certificates(mut self, path: PathBuf) -> ConnectOptions {
519 self.certificates = vec![path];
520 self
521 }
522
523 /// Loads client certificate by providing the path to it.
524 ///
525 /// # Examples
526 /// ```no_run
527 /// # #[tokio::main]
528 /// # async fn main() -> Result<(), async_nats::ConnectError> {
529 /// let nc = async_nats::ConnectOptions::new()
530 /// .add_client_certificate("cert.pem".into(), "key.pem".into())
531 /// .connect("demo.nats.io")
532 /// .await?;
533 /// # Ok(())
534 /// # }
535 /// ```
536 pub fn add_client_certificate(mut self, cert: PathBuf, key: PathBuf) -> ConnectOptions {
537 self.client_cert = Some(cert);
538 self.client_key = Some(key);
539 self
540 }
541
542 /// Sets or disables TLS requirement. If TLS connection is impossible while `options.require_tls(true)` connection will return error.
543 ///
544 /// # Examples
545 /// ```no_run
546 /// # #[tokio::main]
547 /// # async fn main() -> Result<(), async_nats::ConnectError> {
548 /// let nc = async_nats::ConnectOptions::new()
549 /// .require_tls(true)
550 /// .connect("demo.nats.io")
551 /// .await?;
552 /// # Ok(())
553 /// # }
554 /// ```
555 pub fn require_tls(mut self, is_required: bool) -> ConnectOptions {
556 self.tls_required = is_required;
557 self
558 }
559
560 /// Changes how tls connection is established. If `tls_first` is set,
561 /// client will try to establish tls before getting info from the server.
562 /// That requires the server to enable `handshake_first` option in the config.
563 pub fn tls_first(mut self) -> ConnectOptions {
564 self.tls_first = true;
565 self.tls_required = true;
566 self
567 }
568
569 /// Sets how often Client sends PING message to the server.
570 ///
571 /// # Examples
572 /// ```no_run
573 /// # use tokio::time::Duration;
574 /// # #[tokio::main]
575 /// # async fn main() -> Result<(), async_nats::ConnectError> {
576 /// async_nats::ConnectOptions::new()
577 /// .ping_interval(Duration::from_secs(24))
578 /// .connect("demo.nats.io")
579 /// .await?;
580 /// # Ok(())
581 /// # }
582 /// ```
583 pub fn ping_interval(mut self, ping_interval: Duration) -> ConnectOptions {
584 self.ping_interval = ping_interval;
585 self
586 }
587
588 /// Sets `no_echo` option which disables delivering messages that were published from the same
589 /// connection.
590 ///
591 /// # Examples
592 /// ```no_run
593 /// # #[tokio::main]
594 /// # async fn main() -> Result<(), async_nats::ConnectError> {
595 /// async_nats::ConnectOptions::new()
596 /// .no_echo()
597 /// .connect("demo.nats.io")
598 /// .await?;
599 /// # Ok(())
600 /// # }
601 /// ```
602 pub fn no_echo(mut self) -> ConnectOptions {
603 self.no_echo = true;
604 self
605 }
606
607 /// Sets the capacity for `Subscribers`. Exceeding it will trigger `slow consumer` error
608 /// callback and drop messages.
609 /// Default is set to 65536 messages buffer.
610 ///
611 /// # Examples
612 /// ```no_run
613 /// # #[tokio::main]
614 /// # async fn main() -> Result<(), async_nats::ConnectError> {
615 /// async_nats::ConnectOptions::new()
616 /// .subscription_capacity(1024)
617 /// .connect("demo.nats.io")
618 /// .await?;
619 /// # Ok(())
620 /// # }
621 /// ```
622 pub fn subscription_capacity(mut self, capacity: usize) -> ConnectOptions {
623 self.subscription_capacity = capacity;
624 self
625 }
626
627 /// Sets a timeout for the underlying TcpStream connection to avoid hangs and deadlocks.
628 /// Default is set to 5 seconds.
629 ///
630 /// # Examples
631 /// ```no_run
632 /// # #[tokio::main]
633 /// # async fn main() -> Result<(), async_nats::ConnectError> {
634 /// async_nats::ConnectOptions::new()
635 /// .connection_timeout(tokio::time::Duration::from_secs(5))
636 /// .connect("demo.nats.io")
637 /// .await?;
638 /// # Ok(())
639 /// # }
640 /// ```
641 pub fn connection_timeout(mut self, timeout: Duration) -> ConnectOptions {
642 self.connection_timeout = timeout;
643 self
644 }
645
646 /// Sets a timeout for `Client::request`. Default value is set to 10 seconds.
647 ///
648 /// # Examples
649 /// ```no_run
650 /// # #[tokio::main]
651 /// # async fn main() -> Result<(), async_nats::ConnectError> {
652 /// async_nats::ConnectOptions::new()
653 /// .request_timeout(Some(std::time::Duration::from_secs(3)))
654 /// .connect("demo.nats.io")
655 /// .await?;
656 /// # Ok(())
657 /// # }
658 /// ```
659 pub fn request_timeout(mut self, timeout: Option<Duration>) -> ConnectOptions {
660 self.request_timeout = timeout;
661 self
662 }
663
664 /// Registers an asynchronous callback for errors that are received over the wire from the server.
665 ///
666 /// # Examples
667 /// As asynchronous callbacks are still not in `stable` channel, here are some examples how to
668 /// work around this
669 ///
670 /// ## Basic
671 /// If you don't need to move anything into the closure, simple signature can be used:
672 ///
673 /// ```no_run
674 /// # #[tokio::main]
675 /// # async fn main() -> Result<(), async_nats::ConnectError> {
676 /// async_nats::ConnectOptions::new()
677 /// .event_callback(|event| async move {
678 /// println!("event occurred: {}", event);
679 /// })
680 /// .connect("demo.nats.io")
681 /// .await?;
682 /// # Ok(())
683 /// # }
684 /// ```
685 ///
686 /// ## Listening to specific event kind
687 /// ```no_run
688 /// # #[tokio::main]
689 /// # async fn main() -> Result<(), async_nats::ConnectError> {
690 /// async_nats::ConnectOptions::new()
691 /// .event_callback(|event| async move {
692 /// match event {
693 /// async_nats::Event::Disconnected => println!("disconnected"),
694 /// async_nats::Event::Connected => println!("reconnected"),
695 /// async_nats::Event::ClientError(err) => println!("client error occurred: {}", err),
696 /// other => println!("other event happened: {}", other),
697 /// }
698 /// })
699 /// .connect("demo.nats.io")
700 /// .await?;
701 /// # Ok(())
702 /// # }
703 /// ```
704 ///
705 /// ## Advanced
706 /// If you need to move something into the closure, here's an example how to do that
707 ///
708 /// ```no_run
709 /// # #[tokio::main]
710 /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
711 /// let (tx, mut _rx) = tokio::sync::mpsc::channel(1);
712 /// async_nats::ConnectOptions::new()
713 /// .event_callback(move |event| {
714 /// let tx = tx.clone();
715 /// async move {
716 /// tx.send(event).await.unwrap();
717 /// }
718 /// })
719 /// .connect("demo.nats.io")
720 /// .await?;
721 /// # Ok(())
722 /// # }
723 /// ```
724 pub fn event_callback<F, Fut>(mut self, cb: F) -> ConnectOptions
725 where
726 F: Fn(Event) -> Fut + Send + Sync + 'static,
727 Fut: Future<Output = ()> + 'static + Send + Sync,
728 {
729 self.event_callback = Some(CallbackArg1::<Event, ()>(Box::new(move |event| {
730 Box::pin(cb(event))
731 })));
732 self
733 }
734
735 /// Registers a callback for a custom reconnect delay handler that can be used to define a backoff duration strategy.
736 ///
737 /// # Examples
738 /// ```no_run
739 /// # #[tokio::main]
740 /// # async fn main() -> Result<(), async_nats::ConnectError> {
741 /// async_nats::ConnectOptions::new()
742 /// .reconnect_delay_callback(|attempts| {
743 /// println!("no of attempts: {attempts}");
744 /// std::time::Duration::from_millis(std::cmp::min((attempts * 100) as u64, 8000))
745 /// })
746 /// .connect("demo.nats.io")
747 /// .await?;
748 /// # Ok(())
749 /// # }
750 /// ```
751 pub fn reconnect_delay_callback<F>(mut self, cb: F) -> ConnectOptions
752 where
753 F: Fn(usize) -> Duration + Send + Sync + 'static,
754 {
755 self.reconnect_delay_callback = Box::new(cb);
756 self
757 }
758
759 /// By default, Client dispatches op's to the Client onto the channel with capacity of 128.
760 /// This option enables overriding it.
761 ///
762 /// # Examples
763 /// ```
764 /// # #[tokio::main]
765 /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
766 /// async_nats::ConnectOptions::new()
767 /// .client_capacity(256)
768 /// .connect("demo.nats.io")
769 /// .await?;
770 /// # Ok(())
771 /// # }
772 /// ```
773 pub fn client_capacity(mut self, capacity: usize) -> ConnectOptions {
774 self.sender_capacity = capacity;
775 self
776 }
777
778 /// Sets custom prefix instead of default `_INBOX`.
779 ///
780 /// # Examples
781 ///
782 /// ```
783 /// # #[tokio::main]
784 /// # async fn main() -> Result<(), async_nats::Error> {
785 /// async_nats::ConnectOptions::new()
786 /// .custom_inbox_prefix("CUSTOM")
787 /// .connect("demo.nats.io")
788 /// .await?;
789 /// # Ok(())
790 /// # }
791 /// ```
792 pub fn custom_inbox_prefix<T: ToString>(mut self, prefix: T) -> ConnectOptions {
793 self.inbox_prefix = prefix.to_string();
794 self
795 }
796
797 /// Sets the name for the client.
798 ///
799 /// # Examples
800 /// ```
801 /// # #[tokio::main]
802 /// # async fn main() -> Result<(), async_nats::Error> {
803 /// async_nats::ConnectOptions::new()
804 /// .name("rust-service")
805 /// .connect("demo.nats.io")
806 /// .await?;
807 /// # Ok(())
808 /// # }
809 /// ```
810 pub fn name<T: ToString>(mut self, name: T) -> ConnectOptions {
811 self.name = Some(name.to_string());
812 self
813 }
814
815 /// By default, [`ConnectOptions::connect`] will return an error if
816 /// the connection to the server cannot be established.
817 ///
818 /// Setting `retry_on_initial_connect` makes the client
819 /// establish the connection in the background.
820 pub fn retry_on_initial_connect(mut self) -> ConnectOptions {
821 self.retry_on_initial_connect = true;
822 self
823 }
824
825 /// Specifies the number of consecutive reconnect attempts the client will
826 /// make before giving up. This is useful for preventing zombie services
827 /// from endlessly reaching the servers, but it can also be a footgun and
828 /// surprise for users who do not expect that the client can give up
829 /// entirely.
830 ///
831 /// Pass `None` or `0` for no limit.
832 ///
833 /// # Examples
834 /// ```
835 /// # #[tokio::main]
836 /// # async fn main() -> Result<(), async_nats::Error> {
837 /// async_nats::ConnectOptions::new()
838 /// .max_reconnects(None)
839 /// .connect("demo.nats.io")
840 /// .await?;
841 /// # Ok(())
842 /// # }
843 /// ```
844 pub fn max_reconnects<T: Into<Option<usize>>>(mut self, max_reconnects: T) -> ConnectOptions {
845 let val: Option<usize> = max_reconnects.into();
846 self.max_reconnects = if val == Some(0) { None } else { val };
847 self
848 }
849
850 /// By default, a server may advertise other servers in the cluster known to it.
851 /// By setting this option, the client will ignore the advertised servers.
852 /// This may be useful if the client may not be able to reach them.
853 pub fn ignore_discovered_servers(mut self) -> ConnectOptions {
854 self.ignore_discovered_servers = true;
855 self
856 }
857
858 /// By default, client will pick random server to which it will try connect to.
859 /// This option disables that feature, forcing it to always respect the order
860 /// in which server addresses were passed.
861 pub fn retain_servers_order(mut self) -> ConnectOptions {
862 self.retain_servers_order = true;
863 self
864 }
865
866 /// Allows passing custom rustls tls config.
867 ///
868 /// # Examples
869 /// ```
870 /// # #[tokio::main]
871 /// # async fn main() -> Result<(), async_nats::Error> {
872 /// let mut root_store = async_nats::rustls::RootCertStore::empty();
873 ///
874 /// root_store.add_parsable_certificates(rustls_native_certs::load_native_certs()?);
875 ///
876 /// let tls_client = async_nats::rustls::ClientConfig::builder()
877 /// .with_root_certificates(root_store)
878 /// .with_no_client_auth();
879 ///
880 /// let client = async_nats::ConnectOptions::new()
881 /// .require_tls(true)
882 /// .tls_client_config(tls_client)
883 /// .connect("tls://demo.nats.io")
884 /// .await?;
885 ///
886 /// # Ok(())
887 /// # }
888 /// ```
889 pub fn tls_client_config(mut self, config: rustls::ClientConfig) -> ConnectOptions {
890 self.tls_client_config = Some(config);
891 self
892 }
893
894 /// Sets the initial capacity of the read buffer. Which is a buffer used to gather partial
895 /// protocol messages.
896 ///
897 /// # Examples
898 /// ```
899 /// # #[tokio::main]
900 /// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
901 /// async_nats::ConnectOptions::new()
902 /// .read_buffer_capacity(65535)
903 /// .connect("demo.nats.io")
904 /// .await?;
905 /// # Ok(())
906 /// # }
907 /// ```
908 pub fn read_buffer_capacity(mut self, size: u16) -> ConnectOptions {
909 self.read_buffer_capacity = size;
910 self
911 }
912}
913
914pub(crate) type AsyncCallbackArg1<A, T> =
915 Box<dyn Fn(A) -> Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>> + Send + Sync>;
916
917pub(crate) struct CallbackArg1<A, T>(AsyncCallbackArg1<A, T>);
918
919impl<A, T> CallbackArg1<A, T> {
920 pub(crate) async fn call(&self, arg: A) -> T {
921 (self.0.as_ref())(arg).await
922 }
923}
924
925impl<A, T> fmt::Debug for CallbackArg1<A, T> {
926 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
927 f.write_str("callback")
928 }
929}
930
931/// Error report from signing callback.
932// This was needed because std::io::Error isn't Send.
933#[derive(Clone, PartialEq)]
934pub struct AuthError(String);
935
936impl AuthError {
937 pub fn new(s: impl ToString) -> Self {
938 Self(s.to_string())
939 }
940}
941
942impl std::fmt::Display for AuthError {
943 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
944 f.write_str(&format!("AuthError({})", &self.0))
945 }
946}
947
948impl std::fmt::Debug for AuthError {
949 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
950 f.write_str(&format!("AuthError({})", &self.0))
951 }
952}
953
954impl std::error::Error for AuthError {}