fuel_streams_core/nats/nats_client.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
use async_nats::{
error,
jetstream::{context::CreateKeyValueErrorKind, kv},
};
use tracing::info;
use super::{types::*, NatsClientOpts, NatsError, NatsNamespace};
/// NatsClient is a wrapper around the NATS client that provides additional functionality
/// geared towards fuel-streaming use-cases
///
/// # Examples
///
/// Creating a new `NatsClient`:
///
/// ```no_run
/// use fuel_streams_core::prelude::*;
///
/// async fn example() -> BoxedResult<()> {
/// let opts = NatsClientOpts::new("nats://localhost:4222");
/// let client = NatsClient::connect(&opts).await?;
/// Ok(())
/// }
/// ```
///
/// Creating a key-value store:
///
/// ```no_run
/// use fuel_streams_core::prelude::*;
/// use async_nats::jetstream::kv;
///
/// async fn example() -> BoxedResult<()> {
/// let opts = NatsClientOpts::new("nats://localhost:4222");
/// let client = NatsClient::connect(&opts).await?;
/// let kv_config = kv::Config {
/// bucket: "my-bucket".into(),
/// ..Default::default()
/// };
///
/// let store = client.get_or_create_kv_store(kv_config).await?;
/// Ok(())
/// }
/// ```
#[derive(Debug, Clone)]
pub struct NatsClient {
/// The underlying NATS client
pub nats_client: async_nats::Client,
/// The JetStream context for this client
pub jetstream: JetStreamContext,
/// The namespace used for this client
pub namespace: NatsNamespace,
/// The options used to create this client
pub opts: NatsClientOpts,
}
impl NatsClient {
pub async fn connect(opts: &NatsClientOpts) -> Result<Self, NatsError> {
let url = &opts.url;
let namespace = opts.namespace.clone();
let nats_client =
opts.connect_opts().connect(&url).await.map_err(|e| {
NatsError::ConnectionError {
url: url.to_string(),
source: e,
}
})?;
let jetstream = async_nats::jetstream::new(nats_client.to_owned());
info!("Connected to NATS server at {}", url);
Ok(Self {
nats_client,
jetstream,
opts: opts.to_owned(),
namespace,
})
}
pub async fn get_or_create_kv_store(
&self,
options: kv::Config,
) -> Result<kv::Store, error::Error<CreateKeyValueErrorKind>> {
let bucket = options.bucket.clone();
let store = self.jetstream.get_key_value(&bucket).await;
let store = match store {
Ok(store) => store,
Err(_) => self.jetstream.create_key_value(options).await?,
};
Ok(store)
}
pub fn is_connected(&self) -> bool {
self.state() == ConnectionState::Connected
}
fn state(&self) -> ConnectionState {
self.nats_client.connection_state()
}
}