use bytes::Bytes;
use std::{cmp, ops::Deref, sync::Arc};
use crate::data::ObjectStatus;
use crate::watch::State;
use super::{ServeError, Track};
pub struct Groups {
pub track: Arc<Track>,
}
impl Groups {
pub fn produce(self) -> (GroupsWriter, GroupsReader) {
let (writer, reader) = State::default().split();
let writer = GroupsWriter::new(writer, self.track.clone());
let reader = GroupsReader::new(reader, self.track);
(writer, reader)
}
}
impl Deref for Groups {
type Target = Track;
fn deref(&self) -> &Self::Target {
&self.track
}
}
struct GroupsState {
latest: Option<GroupReader>,
epoch: u64, closed: Result<(), ServeError>,
}
impl Default for GroupsState {
fn default() -> Self {
Self {
latest: None,
epoch: 0,
closed: Ok(()),
}
}
}
pub struct GroupsWriter {
pub info: Arc<Track>,
state: State<GroupsState>,
next: u64, }
impl GroupsWriter {
fn new(state: State<GroupsState>, track: Arc<Track>) -> Self {
Self {
info: track,
state,
next: 0,
}
}
pub fn append(&mut self, priority: u8) -> Result<GroupWriter, ServeError> {
self.create(Group {
group_id: self.next,
priority,
})
}
pub fn create(&mut self, group: Group) -> Result<GroupWriter, ServeError> {
let group = GroupInfo {
track: self.info.clone(),
group_id: group.group_id,
priority: group.priority,
};
let (writer, reader) = group.produce();
let mut state = self.state.lock_mut().ok_or(ServeError::Cancel)?;
if let Some(latest) = &state.latest {
match writer.group_id.cmp(&latest.group_id) {
cmp::Ordering::Less => return Ok(writer), cmp::Ordering::Equal => return Err(ServeError::Duplicate),
cmp::Ordering::Greater => state.latest = Some(reader),
}
} else {
state.latest = Some(reader);
}
self.next = state.latest.as_ref().unwrap().group_id + 1;
state.epoch += 1;
Ok(writer)
}
pub fn close(self, err: ServeError) -> Result<(), ServeError> {
let state = self.state.lock();
state.closed.clone()?;
let mut state = state.into_mut().ok_or(ServeError::Cancel)?;
state.closed = Err(err);
Ok(())
}
}
impl Deref for GroupsWriter {
type Target = Track;
fn deref(&self) -> &Self::Target {
&self.info
}
}
#[derive(Clone)]
pub struct GroupsReader {
pub info: Arc<Track>,
state: State<GroupsState>,
epoch: u64,
}
impl GroupsReader {
fn new(state: State<GroupsState>, track: Arc<Track>) -> Self {
Self {
info: track,
state,
epoch: 0,
}
}
pub async fn next(&mut self) -> Result<Option<GroupReader>, ServeError> {
loop {
{
let state = self.state.lock();
if self.epoch != state.epoch {
self.epoch = state.epoch;
return Ok(state.latest.clone());
}
state.closed.clone()?;
match state.modified() {
Some(notify) => notify,
None => return Ok(None),
}
}
.await; }
}
pub fn latest(&self) -> Option<(u64, u64)> {
let state = self.state.lock();
state.latest.as_ref().map(|group| (group.group_id, group.latest()))
}
}
impl Deref for GroupsReader {
type Target = Track;
fn deref(&self) -> &Self::Target {
&self.info
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Group {
pub group_id: u64,
pub priority: u8,
}
#[derive(Debug, Clone, PartialEq)]
pub struct GroupInfo {
pub track: Arc<Track>,
pub group_id: u64,
pub priority: u8,
}
impl GroupInfo {
pub fn produce(self) -> (GroupWriter, GroupReader) {
let (writer, reader) = State::default().split();
let info = Arc::new(self);
let writer = GroupWriter::new(writer, info.clone());
let reader = GroupReader::new(reader, info);
(writer, reader)
}
}
impl Deref for GroupInfo {
type Target = Track;
fn deref(&self) -> &Self::Target {
&self.track
}
}
struct GroupState {
objects: Vec<GroupObjectReader>,
closed: Result<(), ServeError>,
}
impl Default for GroupState {
fn default() -> Self {
Self {
objects: Vec::new(),
closed: Ok(()),
}
}
}
pub struct GroupWriter {
state: State<GroupState>,
pub info: Arc<GroupInfo>,
next: u64,
}
impl GroupWriter {
fn new(state: State<GroupState>, group: Arc<GroupInfo>) -> Self {
Self {
state,
info: group,
next: 0,
}
}
pub fn write(&mut self, payload: bytes::Bytes) -> Result<(), ServeError> {
let mut object = self.create(payload.len())?;
object.write(payload)?;
Ok(())
}
pub fn create(&mut self, size: usize) -> Result<GroupObjectWriter, ServeError> {
let (writer, reader) = GroupObject {
group: self.info.clone(),
object_id: self.next,
status: ObjectStatus::Object,
size,
}
.produce();
self.next += 1;
let mut state = self.state.lock_mut().ok_or(ServeError::Cancel)?;
state.objects.push(reader);
Ok(writer)
}
pub fn close(self, err: ServeError) -> Result<(), ServeError> {
let state = self.state.lock();
state.closed.clone()?;
let mut state = state.into_mut().ok_or(ServeError::Cancel)?;
state.closed = Err(err);
Ok(())
}
pub fn len(&self) -> usize {
self.state.lock().objects.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl Deref for GroupWriter {
type Target = GroupInfo;
fn deref(&self) -> &Self::Target {
&self.info
}
}
#[derive(Clone)]
pub struct GroupReader {
state: State<GroupState>,
pub info: Arc<GroupInfo>,
index: usize,
}
impl GroupReader {
fn new(state: State<GroupState>, group: Arc<GroupInfo>) -> Self {
Self {
state,
info: group,
index: 0,
}
}
pub fn latest(&self) -> u64 {
let state = self.state.lock();
state.objects.last().map(|o| o.object_id).unwrap_or_default()
}
pub async fn read_next(&mut self) -> Result<Option<Bytes>, ServeError> {
let object = self.next().await?;
match object {
Some(mut object) => Ok(Some(object.read_all().await?)),
None => Ok(None),
}
}
pub async fn next(&mut self) -> Result<Option<GroupObjectReader>, ServeError> {
loop {
{
let state = self.state.lock();
if self.index < state.objects.len() {
let object = state.objects[self.index].clone();
self.index += 1;
return Ok(Some(object));
}
state.closed.clone()?;
match state.modified() {
Some(notify) => notify,
None => return Ok(None),
}
}
.await; }
}
pub fn pos(&self) -> usize {
self.index
}
pub fn len(&self) -> usize {
self.state.lock().objects.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl Deref for GroupReader {
type Target = GroupInfo;
fn deref(&self) -> &Self::Target {
&self.info
}
}
#[derive(Clone, PartialEq, Debug)]
pub struct GroupObject {
pub group: Arc<GroupInfo>,
pub object_id: u64,
pub size: usize,
pub status: ObjectStatus,
}
impl GroupObject {
pub fn produce(self) -> (GroupObjectWriter, GroupObjectReader) {
let (writer, reader) = State::default().split();
let info = Arc::new(self);
let writer = GroupObjectWriter::new(writer, info.clone());
let reader = GroupObjectReader::new(reader, info);
(writer, reader)
}
}
impl Deref for GroupObject {
type Target = GroupInfo;
fn deref(&self) -> &Self::Target {
&self.group
}
}
struct GroupObjectState {
chunks: Vec<Bytes>,
closed: Result<(), ServeError>,
}
impl Default for GroupObjectState {
fn default() -> Self {
Self {
chunks: Vec::new(),
closed: Ok(()),
}
}
}
pub struct GroupObjectWriter {
state: State<GroupObjectState>,
pub info: Arc<GroupObject>,
remain: usize,
}
impl GroupObjectWriter {
fn new(state: State<GroupObjectState>, object: Arc<GroupObject>) -> Self {
Self {
state,
remain: object.size,
info: object,
}
}
pub fn write(&mut self, chunk: Bytes) -> Result<(), ServeError> {
if chunk.len() > self.remain {
return Err(ServeError::Size);
}
self.remain -= chunk.len();
let mut state = self.state.lock_mut().ok_or(ServeError::Cancel)?;
state.chunks.push(chunk);
Ok(())
}
pub fn close(self, err: ServeError) -> Result<(), ServeError> {
if self.remain != 0 {
return Err(ServeError::Size);
}
let state = self.state.lock();
state.closed.clone()?;
let mut state = state.into_mut().ok_or(ServeError::Cancel)?;
state.closed = Err(err);
Ok(())
}
}
impl Drop for GroupObjectWriter {
fn drop(&mut self) {
if self.remain == 0 {
return;
}
if let Some(mut state) = self.state.lock_mut() {
state.closed = Err(ServeError::Size);
}
}
}
impl Deref for GroupObjectWriter {
type Target = GroupObject;
fn deref(&self) -> &Self::Target {
&self.info
}
}
#[derive(Clone)]
pub struct GroupObjectReader {
state: State<GroupObjectState>,
pub info: Arc<GroupObject>,
index: usize,
}
impl GroupObjectReader {
fn new(state: State<GroupObjectState>, object: Arc<GroupObject>) -> Self {
Self {
state,
info: object,
index: 0,
}
}
pub async fn read(&mut self) -> Result<Option<Bytes>, ServeError> {
loop {
{
let state = self.state.lock();
if self.index < state.chunks.len() {
let chunk = state.chunks[self.index].clone();
self.index += 1;
return Ok(Some(chunk));
}
state.closed.clone()?;
match state.modified() {
Some(notify) => notify,
None => return Ok(None), }
}
.await; }
}
pub async fn read_all(&mut self) -> Result<Bytes, ServeError> {
let mut chunks = Vec::new();
while let Some(chunk) = self.read().await? {
chunks.push(chunk);
}
Ok(Bytes::from(chunks.concat()))
}
}
impl Deref for GroupObjectReader {
type Target = GroupObject;
fn deref(&self) -> &Self::Target {
&self.info
}
}