fuel_streams_core/stream/
stream_impl.rs#[cfg(any(test, feature = "test-helpers"))]
use std::pin::Pin;
use std::{fmt::Debug, time::Duration};
use async_nats::{
jetstream::{
consumer::AckPolicy,
kv::{self, CreateErrorKind},
stream::{self, LastRawMessageErrorKind, State},
},
RequestErrorKind,
};
use async_trait::async_trait;
use fuel_streams_macros::subject::IntoSubject;
use futures::{future, StreamExt, TryStreamExt};
use tokio::sync::OnceCell;
use super::{error::StreamError, stream_encoding::StreamEncoder};
use crate::{nats::types::*, prelude::NatsClient};
pub const FUEL_BLOCK_TIME_SECS: u64 = 1;
pub const MAX_RETENTION_BLOCKS: u64 = 100;
#[async_trait]
pub trait Streamable: StreamEncoder {
const NAME: &'static str;
const WILDCARD_LIST: &'static [&'static str];
}
#[derive(Debug, Clone)]
pub struct Stream<S: Streamable> {
store: kv::Store,
_marker: std::marker::PhantomData<S>,
}
impl<S: Streamable> Stream<S> {
#[allow(clippy::declare_interior_mutable_const)]
const INSTANCE: OnceCell<Self> = OnceCell::const_new();
pub async fn get_or_init(client: &NatsClient) -> Self {
let cell = Self::INSTANCE;
cell.get_or_init(|| async { Self::new(client).await.to_owned() })
.await
.to_owned()
}
pub async fn new(client: &NatsClient) -> Self {
let namespace = &client.namespace;
let bucket_name = namespace.stream_name(S::NAME);
let store = client
.get_or_create_kv_store(kv::Config {
bucket: bucket_name.to_owned(),
storage: stream::StorageType::File,
history: 1,
compression: true,
max_age: Duration::from_secs(
FUEL_BLOCK_TIME_SECS * MAX_RETENTION_BLOCKS,
),
..Default::default()
})
.await
.expect("Streams must be created");
Self {
store,
_marker: std::marker::PhantomData,
}
}
pub async fn publish_many(
&self,
subjects: &[Box<dyn IntoSubject>],
payload: &S,
) -> Result<(), StreamError> {
future::try_join_all(
subjects
.iter()
.map(|subject| self.publish(&**subject, payload)),
)
.await?;
Ok(())
}
pub async fn publish(
&self,
subject: &dyn IntoSubject,
payload: &S,
) -> Result<usize, StreamError> {
let subject_name = &subject.parse();
self.publish_raw(subject_name, payload).await
}
pub async fn publish_raw(
&self,
subject_name: &str,
payload: &S,
) -> Result<usize, StreamError> {
let data = payload.encode(subject_name).await;
let data_size = data.len();
let result = self.store.create(subject_name, data.into()).await;
match result {
Ok(_) => Ok(data_size),
Err(e) if e.kind() == CreateErrorKind::AlreadyExists => {
Ok(data_size)
}
Err(e) => Err(StreamError::PublishFailed {
subject_name: subject_name.to_string(),
source: e,
}),
}
}
pub async fn get_consumers_and_state(
&self,
) -> Result<(String, Vec<String>, State), RequestErrorKind> {
let mut consumers = vec![];
while let Ok(Some(consumer)) =
self.store.stream.consumer_names().try_next().await
{
consumers.push(consumer);
}
let state = self.store.stream.cached_info().state;
let stream_name = self.get_stream_name().to_string();
Ok((stream_name, consumers, state))
}
pub fn get_stream_name(&self) -> &str {
self.store.stream_name.as_str()
}
pub async fn subscribe(
&self,
wildcard: &str,
) -> Result<impl futures::Stream<Item = Option<Vec<u8>>>, StreamError> {
Ok(self.store.watch(&wildcard).await.map(|stream| {
stream.map(|entry| {
entry.ok().map(|entry_item| entry_item.value.to_vec())
})
})?)
}
#[cfg(feature = "test-helpers")]
pub async fn catchup(
&self,
number_of_messages: usize,
) -> Result<
Pin<Box<dyn futures::Stream<Item = Option<S>> + Send>>,
StreamError,
> {
let config = PullConsumerConfig {
filter_subjects: self.all_filter_subjects(),
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::None,
..Default::default()
};
let config = self.prefix_filter_subjects(config);
let consumer = self.store.stream.create_consumer(config).await?;
let stream = consumer.messages().await?.take(number_of_messages).then(
|message| async {
if let Ok(message) = message {
Some(S::decode(message.payload.to_vec()).await)
} else {
None
}
},
);
Ok(Box::pin(stream))
}
pub async fn subscribe_consumer(
&self,
config: SubscribeConsumerConfig,
) -> Result<PullConsumerStream, StreamError> {
let config = PullConsumerConfig {
filter_subjects: config.filter_subjects,
deliver_policy: config.deliver_policy,
ack_policy: AckPolicy::None,
..Default::default()
};
let config = self.prefix_filter_subjects(config);
let consumer = self.store.stream.create_consumer(config).await?;
Ok(consumer.messages().await?)
}
pub async fn create_consumer(
&self,
config: PullConsumerConfig,
) -> Result<NatsConsumer<PullConsumerConfig>, StreamError> {
let config = self.prefix_filter_subjects(config);
Ok(self.store.stream.create_consumer(config).await?)
}
#[cfg(feature = "test-helpers")]
fn all_filter_subjects(&self) -> Vec<String> {
S::WILDCARD_LIST.iter().map(|s| s.to_string()).collect()
}
#[cfg(feature = "test-helpers")]
pub async fn is_empty(&self, wildcard: &str) -> bool
where
S: for<'de> serde::Deserialize<'de>,
{
self.get_last_published(wildcard)
.await
.is_ok_and(|result| result.is_none())
}
pub async fn get_last_published(
&self,
wildcard: &str,
) -> Result<Option<S>, StreamError> {
let subject_name = &Self::prefix_filter_subject(wildcard);
let message = self
.store
.stream
.get_last_raw_message_by_subject(subject_name)
.await;
match message {
Ok(message) => {
let payload = S::decode(message.payload.to_vec()).await;
Ok(Some(payload))
}
Err(error) => match &error.kind() {
LastRawMessageErrorKind::NoMessageFound => Ok(None),
_ => Err(error.into()),
},
}
}
#[cfg(any(test, feature = "test-helpers"))]
pub async fn assert_has_stream(
&self,
names: &std::collections::HashSet<String>,
) {
let mut stream = self.store.stream.clone();
let info = stream.info().await.unwrap();
let has_stream = names.iter().any(|n| n.eq(&info.config.name));
assert!(has_stream)
}
fn prefix_filter_subjects(
&self,
mut config: PullConsumerConfig,
) -> PullConsumerConfig {
config.filter_subjects = config
.filter_subjects
.iter()
.map(Self::prefix_filter_subject)
.collect();
config
}
fn prefix_filter_subject(subject: impl Into<String>) -> String {
let subject = subject.into();
format!("$KV.*.{subject}")
}
#[cfg(any(test, feature = "test-helpers"))]
pub fn store(&self) -> &kv::Store {
&self.store
}
}
#[derive(Debug, Clone, Default)]
pub struct SubscribeConsumerConfig {
pub filter_subjects: Vec<String>,
pub deliver_policy: DeliverPolicy,
}