use std::{pin::pin, sync::Arc, time::Duration};
use anyhow::Result;
use deadpool::managed::Manager;
use redis::AsyncCommands;
use tokio::time;
use tokio_stream::{Stream, StreamExt};
use tracing::error;
use crate::session::Session;
const STORAGE_SYNC_INTERVAL: Duration = Duration::from_secs(10);
const STORAGE_EXPIRY: Duration = Duration::from_secs(300);
fn set_opts() -> redis::SetOptions {
redis::SetOptions::default()
.with_expiration(redis::SetExpiry::PX(STORAGE_EXPIRY.as_millis() as usize))
}
#[derive(Clone)]
pub struct StorageMesh {
redis: deadpool_redis::Pool,
host: Option<String>,
}
impl StorageMesh {
pub fn new(redis_url: &str, host: Option<&str>) -> Result<Self> {
let redis = deadpool_redis::Config::from_url(redis_url)
.builder()?
.max_size(4)
.wait_timeout(Some(Duration::from_secs(5)))
.runtime(deadpool_redis::Runtime::Tokio1)
.build()?;
Ok(Self {
redis,
host: host.map(|s| s.to_string()),
})
}
pub fn host(&self) -> Option<&str> {
self.host.as_deref()
}
pub async fn get_owner(&self, name: &str) -> Result<Option<String>> {
let mut conn = self.redis.get().await?;
let (owner, closed) = redis::pipe()
.get(format!("session:{{{name}}}:owner"))
.get(format!("session:{{{name}}}:closed"))
.query_async(&mut conn)
.await?;
if closed {
Ok(None)
} else {
Ok(owner)
}
}
pub async fn get_owner_snapshot(
&self,
name: &str,
) -> Result<(Option<String>, Option<Vec<u8>>)> {
let mut conn = self.redis.get().await?;
let (owner, snapshot, closed) = redis::pipe()
.get(format!("session:{{{name}}}:owner"))
.get(format!("session:{{{name}}}:snapshot"))
.get(format!("session:{{{name}}}:closed"))
.query_async(&mut conn)
.await?;
if closed {
Ok((None, None))
} else {
Ok((owner, snapshot))
}
}
pub async fn background_sync(&self, name: &str, session: Arc<Session>) {
let mut interval = time::interval(STORAGE_SYNC_INTERVAL);
interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = interval.tick() => {}
_ = session.sync_now_wait() => {}
_ = session.terminated() => break,
}
let mut conn = match self.redis.get().await {
Ok(conn) => conn,
Err(err) => {
error!(?err, "failed to connect to redis for sync");
continue;
}
};
let snapshot = match session.snapshot() {
Ok(snapshot) => snapshot,
Err(err) => {
error!(?err, "failed to snapshot session {name}");
continue;
}
};
let mut pipe = redis::pipe();
if let Some(host) = &self.host {
pipe.set_options(format!("session:{{{name}}}:owner"), host, set_opts());
}
pipe.set_options(format!("session:{{{name}}}:snapshot"), snapshot, set_opts());
match pipe.query_async(&mut conn).await {
Ok(()) => {}
Err(err) => error!(?err, "failed to sync session {name}"),
}
}
}
pub async fn mark_closed(&self, name: &str) -> Result<()> {
let mut conn = self.redis.get().await?;
let (owner,): (Option<String>,) = redis::pipe()
.get_del(format!("session:{{{name}}}:owner"))
.del(format!("session:{{{name}}}:snapshot"))
.ignore()
.set_options(format!("session:{{{name}}}:closed"), true, set_opts())
.ignore()
.query_async(&mut conn)
.await?;
if let Some(owner) = owner {
self.notify_transfer(name, &owner).await?;
}
Ok(())
}
pub async fn notify_transfer(&self, name: &str, host: &str) -> Result<()> {
let mut conn = self.redis.get().await?;
() = conn.publish(format!("transfers:{host}"), name).await?;
Ok(())
}
pub fn listen_for_transfers(&self) -> impl Stream<Item = String> + Send + '_ {
async_stream::stream! {
let Some(host) = &self.host else {
return;
};
loop {
let conn = match self.redis.manager().create().await {
Ok(conn) => conn,
Err(err) => {
error!(?err, "failed to connect to redis for pub/sub");
time::sleep(Duration::from_secs(5)).await;
continue;
}
};
let mut pubsub = conn.into_pubsub();
if let Err(err) = pubsub.subscribe(format!("transfers:{host}")).await {
error!(?err, "failed to subscribe to transfers");
time::sleep(Duration::from_secs(1)).await;
continue;
}
let mut msg_stream = pin!(pubsub.into_on_message());
while let Some(msg) = msg_stream.next().await {
match msg.get_payload::<String>() {
Ok(payload) => yield payload,
Err(err) => {
error!(?err, "failed to parse transfers message");
continue;
}
};
}
}
}
}
}