use std::{cmp, collections::BinaryHeap, ops::Deref, sync::Arc};
use super::{ServeError, Track};
use crate::watch::State;
use bytes::Bytes;
use crate::data::ObjectStatus;
pub struct Objects {
pub track: Arc<Track>,
}
impl Objects {
pub fn produce(self) -> (ObjectsWriter, ObjectsReader) {
let (writer, reader) = State::default().split();
let writer = ObjectsWriter {
state: writer,
track: self.track.clone(),
};
let reader = ObjectsReader::new(reader, self.track);
(writer, reader)
}
}
struct ObjectsState {
objects: Vec<ObjectReader>,
epoch: usize,
closed: Result<(), ServeError>,
}
impl Default for ObjectsState {
fn default() -> Self {
Self {
objects: Vec::new(),
epoch: 0,
closed: Ok(()),
}
}
}
pub struct ObjectsWriter {
state: State<ObjectsState>,
pub track: Arc<Track>,
}
impl ObjectsWriter {
pub fn write(&mut self, object: Object, payload: Bytes) -> Result<(), ServeError> {
let mut writer = self.create(object)?;
writer.write(payload)?;
Ok(())
}
pub fn create(&mut self, object: Object) -> Result<ObjectWriter, ServeError> {
let object = ObjectInfo {
track: self.track.clone(),
group_id: object.group_id,
object_id: object.object_id,
priority: object.priority,
status: ObjectStatus::Object,
};
let (writer, reader) = object.produce();
let mut state = self.state.lock_mut().ok_or(ServeError::Cancel)?;
if let Some(first) = state.objects.first() {
match writer.group_id.cmp(&first.group_id) {
cmp::Ordering::Less => return Ok(writer),
cmp::Ordering::Greater => state.objects.clear(),
cmp::Ordering::Equal => {}
}
}
state.objects.push(reader);
state.epoch += 1;
Ok(writer)
}
pub fn close(self, err: ServeError) -> Result<(), ServeError> {
let state = self.state.lock();
state.closed.clone()?;
let mut state = state.into_mut().ok_or(ServeError::Cancel)?;
state.closed = Err(err);
Ok(())
}
}
impl Deref for ObjectsWriter {
type Target = Track;
fn deref(&self) -> &Self::Target {
&self.track
}
}
#[derive(Clone)]
pub struct ObjectsReader {
state: State<ObjectsState>,
pub info: Arc<Track>,
epoch: usize,
pending: BinaryHeap<ObjectReader>,
}
impl ObjectsReader {
fn new(state: State<ObjectsState>, info: Arc<Track>) -> Self {
Self {
state,
info,
epoch: 0,
pending: BinaryHeap::new(),
}
}
pub async fn next(&mut self) -> Result<Option<ObjectReader>, ServeError> {
loop {
{
let state = self.state.lock();
if self.epoch < state.epoch {
let index = state.objects.len().saturating_sub(state.epoch - self.epoch);
for object in &state.objects[index..] {
self.pending.push(object.clone());
}
self.epoch = state.epoch;
}
if let Some(object) = self.pending.pop() {
return Ok(Some(object));
}
state.closed.clone()?;
match state.modified() {
Some(notify) => notify,
None => return Ok(None), }
}
.await;
}
}
pub fn latest(&self) -> Option<(u64, u64)> {
let state = self.state.lock();
state
.objects
.iter()
.max_by_key(|a| (a.group_id, a.object_id))
.map(|a| (a.group_id, a.object_id))
}
}
impl Deref for ObjectsReader {
type Target = Track;
fn deref(&self) -> &Self::Target {
&self.info
}
}
#[derive(Clone, PartialEq, Debug)]
pub struct ObjectInfo {
pub track: Arc<Track>,
pub group_id: u64,
pub object_id: u64,
pub priority: u8,
pub status: ObjectStatus,
}
impl Deref for ObjectInfo {
type Target = Track;
fn deref(&self) -> &Self::Target {
&self.track
}
}
impl ObjectInfo {
pub fn produce(self) -> (ObjectWriter, ObjectReader) {
let (writer, reader) = State::default().split();
let info = Arc::new(self);
let writer = ObjectWriter::new(writer, info.clone());
let reader = ObjectReader::new(reader, info);
(writer, reader)
}
}
pub struct Object {
pub group_id: u64,
pub object_id: u64,
pub priority: u8,
}
struct ObjectState {
chunks: Vec<Bytes>,
closed: Result<(), ServeError>,
}
impl Default for ObjectState {
fn default() -> Self {
Self {
chunks: Vec::new(),
closed: Ok(()),
}
}
}
pub struct ObjectWriter {
state: State<ObjectState>,
pub info: Arc<ObjectInfo>,
}
impl ObjectWriter {
fn new(state: State<ObjectState>, object: Arc<ObjectInfo>) -> Self {
Self { state, info: object }
}
pub fn write(&mut self, chunk: Bytes) -> Result<(), ServeError> {
let mut state = self.state.lock_mut().ok_or(ServeError::Cancel)?;
state.chunks.push(chunk);
Ok(())
}
pub fn close(self, err: ServeError) -> Result<(), ServeError> {
let state = self.state.lock();
state.closed.clone()?;
let mut state = state.into_mut().ok_or(ServeError::Cancel)?;
state.closed = Err(err);
Ok(())
}
}
impl Deref for ObjectWriter {
type Target = ObjectInfo;
fn deref(&self) -> &Self::Target {
&self.info
}
}
#[derive(Clone)]
pub struct ObjectReader {
state: State<ObjectState>,
pub info: Arc<ObjectInfo>,
index: usize,
}
impl ObjectReader {
fn new(state: State<ObjectState>, object: Arc<ObjectInfo>) -> Self {
Self {
state,
info: object,
index: 0,
}
}
pub async fn read(&mut self) -> Result<Option<Bytes>, ServeError> {
loop {
{
let state = self.state.lock();
if self.index < state.chunks.len() {
let chunk = state.chunks[self.index].clone();
self.index += 1;
return Ok(Some(chunk));
}
state.closed.clone()?;
match state.modified() {
Some(notify) => notify,
None => return Ok(None), }
}
.await; }
}
pub async fn read_all(&mut self) -> Result<Bytes, ServeError> {
let mut chunks = Vec::new();
while let Some(chunk) = self.read().await? {
chunks.push(chunk);
}
Ok(Bytes::from(chunks.concat()))
}
}
impl Ord for ObjectReader {
fn cmp(&self, other: &Self) -> cmp::Ordering {
other
.priority
.cmp(&self.priority) .then_with(|| self.group_id.cmp(&other.group_id)) .then_with(|| other.object_id.cmp(&self.object_id)) }
}
impl PartialOrd for ObjectReader {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for ObjectReader {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == cmp::Ordering::Equal
}
}
impl Eq for ObjectReader {}
impl Deref for ObjectReader {
type Target = ObjectInfo;
fn deref(&self) -> &Self::Target {
&self.info
}
}