use std::time::Duration;
use futures_util::{Stream, StreamExt};
use poem::{
web::sse::{Event, SSE},
IntoResponse, Response,
};
use crate::{
payload::Payload,
registry::{MetaMediaType, MetaResponse, MetaResponses, MetaSchema, MetaSchemaRef, Registry},
types::{ToJSON, Type},
ApiResponse,
};
type ToEventFn<T> = Box<dyn (FnMut(T) -> Event) + Send + 'static>;
pub struct EventStream<T: Stream + Send + 'static> {
stream: T,
keep_alive: Option<Duration>,
to_event: Option<ToEventFn<T::Item>>,
}
impl<T: Stream + Send + 'static> EventStream<T> {
pub fn new(stream: T) -> Self {
Self {
stream,
keep_alive: None,
to_event: None,
}
}
#[must_use]
pub fn keep_alive(self, duration: Duration) -> Self {
Self {
keep_alive: Some(duration),
..self
}
}
#[must_use]
pub fn to_event(self, f: impl FnMut(T::Item) -> Event + Send + 'static) -> Self {
Self {
to_event: Some(Box::new(f)),
..self
}
}
}
impl<T: Stream<Item = E> + Send + 'static, E: Type + ToJSON> Payload for EventStream<T> {
const CONTENT_TYPE: &'static str = "text/event-stream";
fn schema_ref() -> MetaSchemaRef {
MetaSchemaRef::Inline(Box::new(MetaSchema {
items: Some(Box::new(E::schema_ref())),
..MetaSchema::new_with_format("array", "event-stream")
}))
}
fn register(registry: &mut Registry) {
E::register(registry);
}
}
impl<T: Stream<Item = E> + Send + 'static, E: Type + ToJSON + 'static> IntoResponse
for EventStream<T>
{
fn into_response(self) -> Response {
let mut sse = match self.to_event {
Some(to_event) => SSE::new(self.stream.map(to_event)),
None => SSE::new(
self.stream
.map(|message| message.to_json_string())
.map(Event::message),
),
};
if let Some(keep_alive) = self.keep_alive {
sse = sse.keep_alive(keep_alive);
}
sse.into_response()
}
}
impl<T: Stream<Item = E> + Send + 'static, E: Type + ToJSON> ApiResponse for EventStream<T> {
fn meta() -> MetaResponses {
MetaResponses {
responses: vec![MetaResponse {
description: "",
status: Some(200),
content: vec![MetaMediaType {
content_type: Self::CONTENT_TYPE,
schema: Self::schema_ref(),
}],
headers: vec![],
}],
}
}
fn register(registry: &mut Registry) {
E::register(registry);
}
}