moq_transfork/model/
track.rsuse tokio::sync::watch;
use super::{Group, GroupConsumer, GroupProducer, Path};
pub use crate::message::GroupOrder;
use crate::Error;
use std::{cmp::Ordering, fmt, ops, sync::Arc, time};
#[derive(Clone, PartialEq, Eq)]
pub struct Track {
pub path: Path,
pub priority: i8,
pub group_order: GroupOrder,
pub group_expires: time::Duration,
}
impl Track {
pub fn new(path: Path) -> Self {
Self {
path,
..Default::default()
}
}
pub fn build() -> TrackBuilder {
TrackBuilder::new()
}
pub fn produce(self) -> (TrackProducer, TrackConsumer) {
let (send, recv) = watch::channel(TrackState::default());
let info = Arc::new(self);
let writer = TrackProducer::new(send, info.clone());
let reader = TrackConsumer::new(recv, info);
(writer, reader)
}
}
impl fmt::Debug for Track {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.path.fmt(f)
}
}
impl Default for Track {
fn default() -> Self {
Self {
path: Default::default(),
priority: 0,
group_order: GroupOrder::Desc,
group_expires: time::Duration::ZERO,
}
}
}
pub struct TrackBuilder {
track: Track,
}
impl Default for TrackBuilder {
fn default() -> Self {
Self::new()
}
}
impl TrackBuilder {
pub fn new() -> Self {
Self {
track: Default::default(),
}
}
pub fn path<T: ToString>(mut self, part: T) -> Self {
self.track.path = self.track.path.push(part);
self
}
pub fn priority(mut self, priority: i8) -> Self {
self.track.priority = priority;
self
}
pub fn group_order(mut self, order: GroupOrder) -> Self {
self.track.group_order = order;
self
}
pub fn group_expires(mut self, expires: time::Duration) -> Self {
self.track.group_expires = expires;
self
}
pub fn produce(self) -> (TrackProducer, TrackConsumer) {
self.track.produce()
}
pub fn into(self) -> Track {
self.track
}
}
impl From<TrackBuilder> for Track {
fn from(builder: TrackBuilder) -> Self {
builder.track
}
}
struct TrackState {
latest: Option<GroupConsumer>,
closed: Result<(), Error>,
}
impl Default for TrackState {
fn default() -> Self {
Self {
latest: None,
closed: Ok(()),
}
}
}
#[derive(Clone)]
pub struct TrackProducer {
pub info: Arc<Track>,
state: watch::Sender<TrackState>,
}
impl TrackProducer {
fn new(state: watch::Sender<TrackState>, info: Arc<Track>) -> Self {
Self { info, state }
}
pub fn create_group(&mut self, sequence: u64) -> GroupProducer {
let group = Group::new(sequence);
let (writer, reader) = group.produce();
self.state.send_if_modified(|state| {
if let Some(latest) = &state.latest {
match reader.sequence.cmp(&latest.sequence) {
Ordering::Less => return false, Ordering::Equal => return false, Ordering::Greater => (),
}
}
state.latest = Some(reader);
true
});
writer
}
pub fn append_group(&mut self) -> GroupProducer {
let sequence = self
.state
.borrow()
.latest
.as_ref()
.map_or(0, |group| group.sequence + 1);
self.create_group(sequence)
}
pub fn close(self, err: Error) {
self.state.send_modify(|state| {
state.closed = Err(err);
});
}
pub fn subscribe(&self) -> TrackConsumer {
TrackConsumer::new(self.state.subscribe(), self.info.clone())
}
pub async fn unused(&self) {
self.state.closed().await
}
}
impl ops::Deref for TrackProducer {
type Target = Track;
fn deref(&self) -> &Self::Target {
&self.info
}
}
#[derive(Clone)]
pub struct TrackConsumer {
pub info: Arc<Track>,
state: watch::Receiver<TrackState>,
prev: Option<u64>, }
impl TrackConsumer {
fn new(state: watch::Receiver<TrackState>, info: Arc<Track>) -> Self {
Self {
state,
info,
prev: None,
}
}
pub fn get_group(&self, sequence: u64) -> Result<GroupConsumer, Error> {
let state = self.state.borrow();
if let Some(latest) = &state.latest {
if latest.sequence == sequence {
return Ok(latest.clone());
}
}
state.closed.clone()?;
Err(Error::NotFound)
}
pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>, Error> {
let state = match self
.state
.wait_for(|state| state.latest.as_ref().map(|latest| latest.sequence) != self.prev || state.closed.is_err())
.await
{
Ok(state) => state,
Err(_) => return Ok(None),
};
if let Some(group) = state.latest.as_ref() {
if Some(group.sequence) != self.prev {
self.prev = Some(group.sequence);
return Ok(Some(group.clone()));
}
}
Err(state.closed.clone().unwrap_err())
}
pub fn latest_group(&self) -> u64 {
let state = self.state.borrow();
state.latest.as_ref().map(|group| group.sequence).unwrap_or_default()
}
pub async fn closed(&self) -> Result<(), Error> {
match self.state.clone().wait_for(|state| state.closed.is_err()).await {
Ok(state) => state.closed.clone(),
Err(_) => Ok(()),
}
}
}
impl ops::Deref for TrackConsumer {
type Target = Track;
fn deref(&self) -> &Self::Target {
&self.info
}
}
impl fmt::Debug for TrackConsumer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.info.path.fmt(f)
}
}