moq_transfork/model/
group.rsuse bytes::Bytes;
use std::ops;
use tokio::sync::watch;
use crate::Error;
use super::{Frame, FrameConsumer, FrameProducer};
#[derive(Clone, PartialEq, Debug)]
pub struct Group {
pub sequence: u64,
}
impl Group {
pub fn new(sequence: u64) -> Group {
Self { sequence }
}
pub fn produce(self) -> (GroupProducer, GroupConsumer) {
let (send, recv) = watch::channel(GroupState::default());
let writer = GroupProducer::new(send, self.clone());
let reader = GroupConsumer::new(recv, self);
(writer, reader)
}
}
#[derive(Debug)]
struct GroupState {
frames: Vec<FrameConsumer>,
closed: Result<(), Error>,
}
impl Default for GroupState {
fn default() -> Self {
Self {
frames: Vec::new(),
closed: Ok(()),
}
}
}
#[derive(Clone, Debug)]
pub struct GroupProducer {
state: watch::Sender<GroupState>,
pub info: Group,
}
impl GroupProducer {
fn new(state: watch::Sender<GroupState>, info: Group) -> Self {
Self { state, info }
}
pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) {
let frame = frame.into();
self.create_frame(frame.len()).write(frame);
}
pub fn create_frame(&mut self, size: usize) -> FrameProducer {
let (writer, reader) = Frame::new(size).produce();
self.state.send_modify(|state| state.frames.push(reader));
writer
}
pub fn frame_count(&self) -> usize {
self.state.borrow().frames.len()
}
pub fn subscribe(&self) -> GroupConsumer {
GroupConsumer::new(self.state.subscribe(), self.info.clone())
}
pub fn close(self, err: Error) {
self.state.send_modify(|state| {
state.closed = Err(err);
});
}
}
impl ops::Deref for GroupProducer {
type Target = Group;
fn deref(&self) -> &Self::Target {
&self.info
}
}
#[derive(Clone, Debug)]
pub struct GroupConsumer {
state: watch::Receiver<GroupState>,
pub info: Group,
index: usize,
}
impl GroupConsumer {
fn new(state: watch::Receiver<GroupState>, group: Group) -> Self {
Self {
state,
info: group,
index: 0,
}
}
pub async fn read_frame(&mut self) -> Result<Option<Bytes>, Error> {
Ok(match self.next_frame().await? {
Some(mut reader) => Some(reader.read_all().await?),
None => None,
})
}
pub async fn next_frame(&mut self) -> Result<Option<FrameConsumer>, Error> {
loop {
{
let state = self.state.borrow_and_update();
if let Some(frame) = state.frames.get(self.index).cloned() {
self.index += 1;
return Ok(Some(frame));
}
state.closed.clone()?;
}
if self.state.changed().await.is_err() {
return Ok(None);
}
}
}
pub fn frame_index(&self) -> usize {
self.index
}
pub fn frame_count(&self) -> usize {
self.state.borrow().frames.len()
}
pub async fn closed(&self) -> Result<(), Error> {
match self.state.clone().wait_for(|state| state.closed.is_err()).await {
Ok(state) => state.closed.clone(),
Err(_) => Ok(()),
}
}
}
impl ops::Deref for GroupConsumer {
type Target = Group;
fn deref(&self) -> &Self::Target {
&self.info
}
}