1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use std::time::Duration;
use futures_util::{Stream, StreamExt};
use poem::{
web::sse::{Event, SSE},
IntoResponse, Response,
};
use crate::{
payload::Payload,
registry::{MetaMediaType, MetaResponse, MetaResponses, MetaSchema, MetaSchemaRef, Registry},
types::{ToJSON, Type},
ApiResponse,
};
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct EventStream<T> {
stream: T,
keep_alive: Option<Duration>,
}
impl<T> EventStream<T> {
pub fn new(stream: T) -> Self {
Self {
stream,
keep_alive: None,
}
}
#[must_use]
pub fn keep_alive(self, duration: Duration) -> Self {
Self {
keep_alive: Some(duration),
..self
}
}
}
impl<T: Stream<Item = E> + Send + 'static, E: Type + ToJSON> Payload for EventStream<T> {
const CONTENT_TYPE: &'static str = "text/event-stream";
fn schema_ref() -> MetaSchemaRef {
MetaSchemaRef::Inline(Box::new(MetaSchema {
items: Some(Box::new(E::schema_ref())),
..MetaSchema::new_with_format("array", "event-stream")
}))
}
}
impl<T: Stream<Item = E> + Send + 'static, E: Type + ToJSON> IntoResponse for EventStream<T> {
fn into_response(self) -> Response {
let mut sse = SSE::new(
self.stream
.map(|value| serde_json::to_string(&value.to_json()))
.take_while(|value| futures_util::future::ready(value.is_ok()))
.map(|value| Event::message(value.unwrap())),
);
if let Some(keep_alive) = self.keep_alive {
sse = sse.keep_alive(keep_alive);
}
sse.into_response()
}
}
impl<T: Stream<Item = E> + Send + 'static, E: Type + ToJSON> ApiResponse for EventStream<T> {
fn meta() -> MetaResponses {
MetaResponses {
responses: vec![MetaResponse {
description: "",
status: Some(200),
content: vec![MetaMediaType {
content_type: Self::CONTENT_TYPE,
schema: Self::schema_ref(),
}],
headers: vec![],
}],
}
}
fn register(registry: &mut Registry) {
E::register(registry);
}
}