moq_transfork/model/
frame.rsuse bytes::{Bytes, BytesMut};
use std::ops;
use tokio::sync::watch;
use crate::Error;
#[derive(Clone, PartialEq)]
pub struct Frame {
pub size: usize,
}
impl Frame {
pub fn new(size: usize) -> Frame {
Self { size }
}
pub fn produce(self) -> (FrameProducer, FrameConsumer) {
let (send, recv) = watch::channel(FrameState::default());
let writer = FrameProducer::new(send, self.clone());
let reader = FrameConsumer::new(recv, self);
(writer, reader)
}
}
struct FrameState {
chunks: Vec<Bytes>,
closed: Result<(), Error>,
}
impl Default for FrameState {
fn default() -> Self {
Self {
chunks: Vec::new(),
closed: Ok(()),
}
}
}
#[derive(Clone)]
pub struct FrameProducer {
state: watch::Sender<FrameState>,
pub info: Frame,
}
impl FrameProducer {
fn new(state: watch::Sender<FrameState>, info: Frame) -> Self {
Self { state, info }
}
pub fn write<B: Into<Bytes>>(&mut self, chunk: B) {
self.state.send_modify(|state| state.chunks.push(chunk.into()));
}
pub fn close(self, err: Error) {
self.state.send_modify(|state| state.closed = Err(err));
}
pub fn subscribe(&self) -> FrameConsumer {
FrameConsumer::new(self.state.subscribe(), self.info.clone())
}
}
impl ops::Deref for FrameProducer {
type Target = Frame;
fn deref(&self) -> &Self::Target {
&self.info
}
}
#[derive(Clone)]
pub struct FrameConsumer {
state: watch::Receiver<FrameState>,
pub info: Frame,
index: usize,
}
impl FrameConsumer {
fn new(state: watch::Receiver<FrameState>, group: Frame) -> Self {
Self {
state,
info: group,
index: 0,
}
}
pub async fn read(&mut self) -> Result<Option<Bytes>, Error> {
loop {
{
let state = self.state.borrow_and_update();
if let Some(chunk) = state.chunks.get(self.index).cloned() {
self.index += 1;
return Ok(Some(chunk));
}
state.closed.clone()?;
}
if self.state.changed().await.is_err() {
return Ok(None);
}
}
}
pub async fn read_all(&mut self) -> Result<Bytes, Error> {
let first = self.read().await?.unwrap_or_else(Bytes::new);
if first.len() == self.size {
return Ok(first);
}
let mut buf = BytesMut::with_capacity(2 * first.len());
buf.extend_from_slice(&first);
while let Some(chunk) = self.read().await? {
buf.extend_from_slice(&chunk);
}
Ok(buf.freeze())
}
pub async fn closed(&self) -> Result<(), Error> {
match self.state.clone().wait_for(|state| state.closed.is_err()).await {
Ok(state) => state.closed.clone(),
Err(_) => Ok(()),
}
}
}
impl ops::Deref for FrameConsumer {
type Target = Frame;
fn deref(&self) -> &Self::Target {
&self.info
}
}