fuel_streams_core/nats/nats_client_opts.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
use std::time::Duration;
use async_nats::ConnectOptions;
use super::NatsNamespace;
#[derive(Debug, Clone, Default)]
pub enum NatsUserRole {
Admin,
#[default]
Default,
}
/// Represents options for configuring a NATS client.
///
/// # Examples
///
/// Creating a new `NatsClientOpts` instance:
///
/// ```
/// use fuel_streams_core::nats::NatsClientOpts;
///
/// let opts = NatsClientOpts::new("nats://localhost:4222");
/// ```
///
/// Creating a public `NatsClientOpts`:
///
/// ```
/// use fuel_streams_core::nats::NatsClientOpts;
///
/// let opts = NatsClientOpts::default_opts("nats://localhost:4222");
/// ```
///
/// Modifying `NatsClientOpts`:
///
/// ```
/// use fuel_streams_core::nats::{NatsClientOpts, NatsUserRole};
///
/// let opts = NatsClientOpts::new("nats://localhost:4222")
/// .with_role(NatsUserRole::Admin)
/// .with_timeout(10);
/// ```
#[derive(Debug, Clone)]
pub struct NatsClientOpts {
/// The URL of the NATS server to connect to.
pub(crate) url: String,
/// The role of the user connecting to the NATS server (Admin or Public).
pub(crate) role: NatsUserRole,
/// The namespace used as a prefix for NATS streams, consumers, and subject names.
pub(crate) namespace: NatsNamespace,
/// The timeout in seconds for NATS operations.
pub(crate) timeout_secs: u64,
}
impl NatsClientOpts {
pub fn new(url: impl ToString) -> Self {
Self {
url: url.to_string(),
role: NatsUserRole::default(),
namespace: NatsNamespace::default(),
timeout_secs: 5,
}
}
pub fn default_opts(url: impl ToString) -> Self {
Self::new(url).with_role(NatsUserRole::Default)
}
#[cfg(any(test, feature = "test-helpers"))]
pub fn admin_opts(url: impl ToString) -> Self {
Self::new(url).with_role(NatsUserRole::Admin)
}
pub fn with_role(self, role: NatsUserRole) -> Self {
Self { role, ..self }
}
#[cfg(any(test, feature = "test-helpers"))]
pub fn with_rdn_namespace(self) -> Self {
let namespace = format!(r"namespace-{}", Self::random_int());
self.with_namespace(&namespace)
}
#[cfg(any(test, feature = "test-helpers"))]
pub fn with_namespace(self, namespace: &str) -> Self {
let namespace = NatsNamespace::Custom(namespace.to_string());
Self { namespace, ..self }
}
pub fn with_timeout(self, secs: u64) -> Self {
Self {
timeout_secs: secs,
..self
}
}
pub(super) fn connect_opts(&self) -> ConnectOptions {
let (user, pass) = match self.role {
NatsUserRole::Admin => (
Some("admin".to_string()),
Some(
dotenvy::var("NATS_ADMIN_PASS")
.expect("`NATS_ADMIN_PASS` env must be set"),
),
),
NatsUserRole::Default => {
(Some("default_user".to_string()), Some("".to_string()))
}
};
match (user, pass) {
(Some(user), Some(pass)) => {
ConnectOptions::with_user_and_password(user, pass)
.connection_timeout(Duration::from_secs(self.timeout_secs))
.max_reconnects(1)
.name(Self::conn_id())
}
_ => ConnectOptions::new()
.connection_timeout(Duration::from_secs(self.timeout_secs))
.max_reconnects(1)
.name(Self::conn_id()),
}
}
// This will be useful for debugging and monitoring connections
fn conn_id() -> String {
format!(r"connection-{}", Self::random_int())
}
fn random_int() -> u32 {
use rand::Rng;
rand::thread_rng().gen()
}
}