async_nats

Struct ConnectOptions

Source
pub struct ConnectOptions { /* private fields */ }
Expand description

Connect options. Used to connect with NATS when custom config is needed.

§Examples

let mut options = async_nats::ConnectOptions::new()
    .require_tls(true)
    .ping_interval(std::time::Duration::from_secs(10))
    .connect("demo.nats.io")
    .await?;

Implementations§

Source§

impl ConnectOptions

Source

pub fn new() -> ConnectOptions

Enables customization of NATS connection.

§Examples
let mut options = async_nats::ConnectOptions::new()
    .require_tls(true)
    .ping_interval(std::time::Duration::from_secs(10))
    .connect("demo.nats.io")
    .await?;
Source

pub async fn connect<A: ToServerAddrs>( self, addrs: A, ) -> Result<Client, ConnectError>

Connect to the NATS Server leveraging all passed options.

§Examples
let nc = async_nats::ConnectOptions::new()
    .require_tls(true)
    .connect("demo.nats.io")
    .await?;
§Pass multiple URLs.
#[tokio::main]
use async_nats::ServerAddr;
let client = async_nats::connect(vec![
    "demo.nats.io".parse::<ServerAddr>()?,
    "other.nats.io".parse::<ServerAddr>()?,
])
.await
.unwrap();
Source

pub fn with_auth_callback<F, Fut>(callback: F) -> Self
where F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<Auth, AuthError>> + 'static + Send + Sync,

Creates a builder with a custom auth callback to be used when authenticating against the NATS Server. Requires an asynchronous function that accepts nonce and returns Auth. It will overwrite all other auth methods used.

§Example
async_nats::ConnectOptions::with_auth_callback(move |_| async move {
    let mut auth = async_nats::Auth::new();
    auth.username = Some("derek".to_string());
    auth.password = Some("s3cr3t".to_string());
    Ok(auth)
})
.connect("demo.nats.io")
.await?;
Source

pub fn with_token(token: String) -> Self

Authenticate against NATS Server with the provided token.

§Examples
let nc = async_nats::ConnectOptions::with_token("t0k3n!".into())
    .connect("demo.nats.io")
    .await?;
Source

pub fn token(self, token: String) -> Self

Use a builder to specify a token, to be used when authenticating against the NATS Server. This can be used as a way to mix authentication methods.

§Examples
let nc = async_nats::ConnectOptions::new()
    .token("t0k3n!".into())
    .connect("demo.nats.io")
    .await?;
Source

pub fn with_user_and_password(user: String, pass: String) -> Self

Authenticate against NATS Server with the provided username and password.

§Examples
let nc = async_nats::ConnectOptions::with_user_and_password("derek".into(), "s3cr3t!".into())
    .connect("demo.nats.io")
    .await?;
Source

pub fn user_and_password(self, user: String, pass: String) -> Self

Use a builder to specify a username and password, to be used when authenticating against the NATS Server. This can be used as a way to mix authentication methods.

§Examples
let nc = async_nats::ConnectOptions::new()
    .user_and_password("derek".into(), "s3cr3t!".into())
    .connect("demo.nats.io")
    .await?;
Source

pub fn with_nkey(seed: String) -> Self

Authenticate with an NKey. Requires an NKey Seed secret.

§Example
let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
let nc = async_nats::ConnectOptions::with_nkey(seed.into())
    .connect("localhost")
    .await?;
Source

pub fn nkey(self, seed: String) -> Self

Use a builder to specify an NKey, to be used when authenticating against the NATS Server. Requires an NKey Seed Secret. This can be used as a way to mix authentication methods.

§Example
let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
let nc = async_nats::ConnectOptions::new()
    .nkey(seed.into())
    .connect("localhost")
    .await?;
Source

pub fn with_jwt<F, Fut>(jwt: String, sign_cb: F) -> Self
where F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<Vec<u8>, AuthError>> + 'static + Send + Sync,

Authenticate with a JWT. Requires function to sign the server nonce. The signing function is asynchronous.

§Example
let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
// load jwt from creds file or other secure source
async fn load_jwt() -> std::io::Result<String> {
    todo!();
}
let jwt = load_jwt().await?;
let nc = async_nats::ConnectOptions::with_jwt(jwt, move |nonce| {
    let key_pair = key_pair.clone();
    async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
})
.connect("localhost")
.await?;
Source

pub fn jwt<F, Fut>(self, jwt: String, sign_cb: F) -> Self
where F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<Vec<u8>, AuthError>> + 'static + Send + Sync,

Use a builder to specify a JWT, to be used when authenticating against the NATS Server. Requires an asynchronous function to sign the server nonce. This can be used as a way to mix authentication methods.

§Example
let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
// load jwt from creds file or other secure source
async fn load_jwt() -> std::io::Result<String> {
    todo!();
}
let jwt = load_jwt().await?;
let nc = async_nats::ConnectOptions::new()
    .jwt(jwt, move |nonce| {
        let key_pair = key_pair.clone();
        async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
    })
    .connect("localhost")
    .await?;
Source

pub async fn with_credentials_file(path: impl AsRef<Path>) -> Result<Self>

Authenticate with NATS using a .creds file. Open the provided file, load its creds, and perform the desired authentication

§Example
let nc = async_nats::ConnectOptions::with_credentials_file("path/to/my.creds")
    .await?
    .connect("connect.ngs.global")
    .await?;
Source

pub async fn credentials_file(self, path: impl AsRef<Path>) -> Result<Self>

Use a builder to specify a credentials file, to be used when authenticating against the NATS Server. This will open the credentials file and load its credentials. This can be used as a way to mix authentication methods.

§Example
let nc = async_nats::ConnectOptions::new()
    .credentials_file("path/to/my.creds")
    .await?
    .connect("connect.ngs.global")
    .await?;
Source

pub fn with_credentials(creds: &str) -> Result<Self>

Authenticate with NATS using a credential str, in the creds file format.

§Example
let creds = "-----BEGIN NATS USER JWT-----
eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
------END NATS USER JWT------

************************* IMPORTANT *************************
NKEY Seed printed below can be used sign and prove identity.
NKEYs are sensitive and should be treated as secrets.

-----BEGIN USER NKEY SEED-----
SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
------END USER NKEY SEED------
";

let nc = async_nats::ConnectOptions::with_credentials(creds)
    .expect("failed to parse static creds")
    .connect("connect.ngs.global")
    .await?;
Source

pub fn credentials(self, creds: &str) -> Result<Self>

Use a builder to specify a credentials string, to be used when authenticating against the NATS Server. The string should be in the credentials file format. This can be used as a way to mix authentication methods.

§Example
let creds = "-----BEGIN NATS USER JWT-----
eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
------END NATS USER JWT------

************************* IMPORTANT *************************
NKEY Seed printed below can be used sign and prove identity.
NKEYs are sensitive and should be treated as secrets.

-----BEGIN USER NKEY SEED-----
SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
------END USER NKEY SEED------
";

let nc = async_nats::ConnectOptions::new()
    .credentials(creds)
    .expect("failed to parse static creds")
    .connect("connect.ngs.global")
    .await?;
Source

pub fn add_root_certificates(self, path: PathBuf) -> ConnectOptions

Loads root certificates by providing the path to them.

§Examples
let nc = async_nats::ConnectOptions::new()
    .add_root_certificates("mycerts.pem".into())
    .connect("demo.nats.io")
    .await?;
Source

pub fn add_client_certificate( self, cert: PathBuf, key: PathBuf, ) -> ConnectOptions

Loads client certificate by providing the path to it.

§Examples
let nc = async_nats::ConnectOptions::new()
    .add_client_certificate("cert.pem".into(), "key.pem".into())
    .connect("demo.nats.io")
    .await?;
Source

pub fn require_tls(self, is_required: bool) -> ConnectOptions

Sets or disables TLS requirement. If TLS connection is impossible while options.require_tls(true) connection will return error.

§Examples
let nc = async_nats::ConnectOptions::new()
    .require_tls(true)
    .connect("demo.nats.io")
    .await?;
Source

pub fn tls_first(self) -> ConnectOptions

Changes how tls connection is established. If tls_first is set, client will try to establish tls before getting info from the server. That requires the server to enable handshake_first option in the config.

Source

pub fn ping_interval(self, ping_interval: Duration) -> ConnectOptions

Sets how often Client sends PING message to the server.

§Examples
async_nats::ConnectOptions::new()
    .ping_interval(Duration::from_secs(24))
    .connect("demo.nats.io")
    .await?;
Source

pub fn no_echo(self) -> ConnectOptions

Sets no_echo option which disables delivering messages that were published from the same connection.

§Examples
async_nats::ConnectOptions::new()
    .no_echo()
    .connect("demo.nats.io")
    .await?;
Source

pub fn subscription_capacity(self, capacity: usize) -> ConnectOptions

Sets the capacity for Subscribers. Exceeding it will trigger slow consumer error callback and drop messages. Default is set to 65536 messages buffer.

§Examples
async_nats::ConnectOptions::new()
    .subscription_capacity(1024)
    .connect("demo.nats.io")
    .await?;
Source

pub fn connection_timeout(self, timeout: Duration) -> ConnectOptions

Sets a timeout for the underlying TcpStream connection to avoid hangs and deadlocks. Default is set to 5 seconds.

§Examples
async_nats::ConnectOptions::new()
    .connection_timeout(tokio::time::Duration::from_secs(5))
    .connect("demo.nats.io")
    .await?;
Source

pub fn request_timeout(self, timeout: Option<Duration>) -> ConnectOptions

Sets a timeout for Client::request. Default value is set to 10 seconds.

§Examples
async_nats::ConnectOptions::new()
    .request_timeout(Some(std::time::Duration::from_secs(3)))
    .connect("demo.nats.io")
    .await?;
Source

pub fn event_callback<F, Fut>(self, cb: F) -> ConnectOptions
where F: Fn(Event) -> Fut + Send + Sync + 'static, Fut: Future<Output = ()> + 'static + Send + Sync,

Registers an asynchronous callback for errors that are received over the wire from the server.

§Examples

As asynchronous callbacks are still not in stable channel, here are some examples how to work around this

§Basic

If you don’t need to move anything into the closure, simple signature can be used:

async_nats::ConnectOptions::new()
    .event_callback(|event| async move {
        println!("event occurred: {}", event);
    })
    .connect("demo.nats.io")
    .await?;
§Listening to specific event kind
async_nats::ConnectOptions::new()
    .event_callback(|event| async move {
        match event {
            async_nats::Event::Disconnected => println!("disconnected"),
            async_nats::Event::Connected => println!("reconnected"),
            async_nats::Event::ClientError(err) => println!("client error occurred: {}", err),
            other => println!("other event happened: {}", other),
        }
    })
    .connect("demo.nats.io")
    .await?;
§Advanced

If you need to move something into the closure, here’s an example how to do that

let (tx, mut _rx) = tokio::sync::mpsc::channel(1);
async_nats::ConnectOptions::new()
    .event_callback(move |event| {
        let tx = tx.clone();
        async move {
            tx.send(event).await.unwrap();
        }
    })
    .connect("demo.nats.io")
    .await?;
Source

pub fn reconnect_delay_callback<F>(self, cb: F) -> ConnectOptions
where F: Fn(usize) -> Duration + Send + Sync + 'static,

Registers a callback for a custom reconnect delay handler that can be used to define a backoff duration strategy.

§Examples
async_nats::ConnectOptions::new()
    .reconnect_delay_callback(|attempts| {
        println!("no of attempts: {attempts}");
        std::time::Duration::from_millis(std::cmp::min((attempts * 100) as u64, 8000))
    })
    .connect("demo.nats.io")
    .await?;
Source

pub fn client_capacity(self, capacity: usize) -> ConnectOptions

By default, Client dispatches op’s to the Client onto the channel with capacity of 128. This option enables overriding it.

§Examples
async_nats::ConnectOptions::new()
    .client_capacity(256)
    .connect("demo.nats.io")
    .await?;
Source

pub fn custom_inbox_prefix<T: ToString>(self, prefix: T) -> ConnectOptions

Sets custom prefix instead of default _INBOX.

§Examples
async_nats::ConnectOptions::new()
    .custom_inbox_prefix("CUSTOM")
    .connect("demo.nats.io")
    .await?;
Source

pub fn name<T: ToString>(self, name: T) -> ConnectOptions

Sets the name for the client.

§Examples
async_nats::ConnectOptions::new()
    .name("rust-service")
    .connect("demo.nats.io")
    .await?;
Source

pub fn retry_on_initial_connect(self) -> ConnectOptions

By default, ConnectOptions::connect will return an error if the connection to the server cannot be established.

Setting retry_on_initial_connect makes the client establish the connection in the background.

Source

pub fn max_reconnects<T: Into<Option<usize>>>( self, max_reconnects: T, ) -> ConnectOptions

Specifies the number of consecutive reconnect attempts the client will make before giving up. This is useful for preventing zombie services from endlessly reaching the servers, but it can also be a footgun and surprise for users who do not expect that the client can give up entirely.

Pass None or 0 for no limit.

§Examples
async_nats::ConnectOptions::new()
    .max_reconnects(None)
    .connect("demo.nats.io")
    .await?;
Source

pub fn ignore_discovered_servers(self) -> ConnectOptions

By default, a server may advertise other servers in the cluster known to it. By setting this option, the client will ignore the advertised servers. This may be useful if the client may not be able to reach them.

Source

pub fn retain_servers_order(self) -> ConnectOptions

By default, client will pick random server to which it will try connect to. This option disables that feature, forcing it to always respect the order in which server addresses were passed.

Source

pub fn tls_client_config(self, config: ClientConfig) -> ConnectOptions

Allows passing custom rustls tls config.

§Examples
let mut root_store = async_nats::rustls::RootCertStore::empty();

root_store.add_parsable_certificates(rustls_native_certs::load_native_certs()?);

let tls_client = async_nats::rustls::ClientConfig::builder()
    .with_root_certificates(root_store)
    .with_no_client_auth();

let client = async_nats::ConnectOptions::new()
    .require_tls(true)
    .tls_client_config(tls_client)
    .connect("tls://demo.nats.io")
    .await?;
Source

pub fn read_buffer_capacity(self, size: u16) -> ConnectOptions

Sets the initial capacity of the read buffer. Which is a buffer used to gather partial protocol messages.

§Examples
async_nats::ConnectOptions::new()
    .read_buffer_capacity(65535)
    .connect("demo.nats.io")
    .await?;

Trait Implementations§

Source§

impl Debug for ConnectOptions

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more
Source§

impl Default for ConnectOptions

Source§

fn default() -> ConnectOptions

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,

Source§

impl<T> MaybeSendSync for T