use futures_lite::prelude::*;
use futures_lite::ready;
use std::task::{Context, Poll};
use std::io;
use std::pin::Pin;
use std::time::Duration;
pin_project_lite::pin_project! {
#[derive(Debug)]
pub struct Encoder {
buf: Box<[u8]>,
cursor: usize,
#[pin]
receiver: async_channel::Receiver<Vec<u8>>,
}
}
impl AsyncRead for Encoder {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let mut this = self.project();
if this.buf.len() <= *this.cursor {
match ready!(this.receiver.as_mut().poll_next(cx)) {
Some(buf) => {
log::trace!("> Received a new buffer with len {}", buf.len());
*this.buf = buf.into_boxed_slice();
*this.cursor = 0;
}
None => {
log::trace!("> Encoder done reading");
return Poll::Ready(Ok(0));
}
};
}
let local_buf = &this.buf[*this.cursor..];
let max = buf.len().min(local_buf.len());
buf[..max].clone_from_slice(&local_buf[..max]);
*this.cursor += max;
Poll::Ready(Ok(max))
}
}
impl AsyncBufRead for Encoder {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let mut this = self.project();
if this.buf.len() <= *this.cursor {
match ready!(this.receiver.as_mut().poll_next(cx)) {
Some(buf) => {
log::trace!("> Received a new buffer with len {}", buf.len());
*this.buf = buf.into_boxed_slice();
*this.cursor = 0;
}
None => {
log::trace!("> Encoder done reading");
return Poll::Ready(Ok(&[]));
}
};
}
Poll::Ready(Ok(&this.buf[*this.cursor..]))
}
fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.project();
*this.cursor += amt;
}
}
#[derive(Debug, Clone)]
pub struct Sender(async_channel::Sender<Vec<u8>>);
pub fn encode() -> (Sender, Encoder) {
let (sender, receiver) = async_channel::bounded(1);
let encoder = Encoder {
receiver,
buf: Box::default(),
cursor: 0,
};
(Sender(sender), encoder)
}
impl Sender {
async fn inner_send(&self, bytes: impl Into<Vec<u8>>) -> io::Result<()> {
self.0
.send(bytes.into())
.await
.map_err(|_| io::Error::new(io::ErrorKind::ConnectionAborted, "sse disconnected"))
}
pub async fn send(
&self,
name: impl Into<Option<&str>>,
data: &str,
id: Option<&str>,
) -> io::Result<()> {
if let Some(name) = name.into() {
self.inner_send(format!("event:{}\n", name)).await?;
}
if let Some(id) = id {
self.inner_send(format!("id:{}\n", id)).await?;
}
for line in data.lines() {
let msg = format!("data:{}\n", line);
self.inner_send(msg).await?;
}
self.inner_send("\n").await?;
Ok(())
}
pub async fn send_retry(&self, dur: Duration, id: Option<&str>) -> io::Result<()> {
if let Some(id) = id {
self.inner_send(format!("id:{}\n", id)).await?;
}
let dur = dur.as_secs_f64() as u64;
let msg = format!("retry:{}\n\n", dur);
self.inner_send(msg).await?;
Ok(())
}
}