fuel_streams_core/stream/
stream_encoding.rsuse std::fmt::Debug;
use async_trait::async_trait;
use fuel_data_parser::{DataParseable, DataParser};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamData<T> {
pub subject: String,
pub timestamp: String,
pub payload: T,
}
impl<T> StreamData<T>
where
T: serde::de::DeserializeOwned + Clone,
{
pub fn new(subject: &str, payload: T) -> Self {
let now: chrono::DateTime<chrono::Utc> = chrono::Utc::now();
let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
Self {
subject: subject.to_string(),
timestamp,
payload,
}
}
#[cfg(feature = "bench-helpers")]
pub fn ts_as_millis(&self) -> u128 {
use chrono::{DateTime, Utc};
DateTime::parse_from_rfc3339(&self.timestamp)
.ok()
.map(|ts| ts.timestamp_millis() as u128)
.unwrap_or_else(|| Utc::now().timestamp_millis() as u128)
}
}
#[async_trait]
pub trait StreamEncoder: DataParseable {
async fn encode(&self, subject: &str) -> Vec<u8> {
let data = StreamData::new(subject, self.clone());
Self::data_parser()
.encode(&data)
.await
.expect("Streamable must encode correctly")
}
async fn decode(encoded: Vec<u8>) -> Self {
Self::decode_raw(encoded).await.payload
}
async fn decode_raw(encoded: Vec<u8>) -> StreamData<Self> {
Self::data_parser()
.decode(&encoded)
.await
.expect("Streamable must decode correctly")
}
fn data_parser() -> DataParser {
DataParser::default()
}
}