poem_openapi/payload/
event_stream.rs1use std::time::Duration;
2
3use futures_util::{Stream, StreamExt};
4use poem::{
5 web::sse::{Event, SSE},
6 IntoResponse, Response,
7};
8
9use crate::{
10 payload::Payload,
11 registry::{MetaMediaType, MetaResponse, MetaResponses, MetaSchema, MetaSchemaRef, Registry},
12 types::{ToJSON, Type},
13 ApiResponse,
14};
15
16type ToEventFn<T> = Box<dyn (FnMut(T) -> Event) + Send + 'static>;
17
18pub struct EventStream<T: Stream + Send + 'static> {
22 stream: T,
23 keep_alive: Option<Duration>,
24 to_event: Option<ToEventFn<T::Item>>,
25}
26
27impl<T: Stream + Send + 'static> EventStream<T> {
28 pub fn new(stream: T) -> Self {
30 Self {
31 stream,
32 keep_alive: None,
33 to_event: None,
34 }
35 }
36
37 #[must_use]
39 pub fn keep_alive(self, duration: Duration) -> Self {
40 Self {
41 keep_alive: Some(duration),
42 ..self
43 }
44 }
45
46 #[must_use]
70 pub fn to_event(self, f: impl FnMut(T::Item) -> Event + Send + 'static) -> Self {
71 Self {
72 to_event: Some(Box::new(f)),
73 ..self
74 }
75 }
76}
77
78impl<T: Stream<Item = E> + Send + 'static, E: Type + ToJSON> Payload for EventStream<T> {
79 const CONTENT_TYPE: &'static str = "text/event-stream";
80
81 fn schema_ref() -> MetaSchemaRef {
82 MetaSchemaRef::Inline(Box::new(MetaSchema {
83 items: Some(Box::new(E::schema_ref())),
84 ..MetaSchema::new_with_format("array", "event-stream")
85 }))
86 }
87
88 fn register(registry: &mut Registry) {
89 E::register(registry);
90 }
91}
92
93impl<T: Stream<Item = E> + Send + 'static, E: Type + ToJSON + 'static> IntoResponse
94 for EventStream<T>
95{
96 fn into_response(self) -> Response {
97 let mut sse = match self.to_event {
98 Some(to_event) => SSE::new(self.stream.map(to_event)),
99 None => SSE::new(
100 self.stream
101 .map(|message| message.to_json_string())
102 .map(Event::message),
103 ),
104 };
105
106 if let Some(keep_alive) = self.keep_alive {
107 sse = sse.keep_alive(keep_alive);
108 }
109
110 sse.into_response()
111 }
112}
113
114impl<T: Stream<Item = E> + Send + 'static, E: Type + ToJSON> ApiResponse for EventStream<T> {
115 fn meta() -> MetaResponses {
116 MetaResponses {
117 responses: vec![MetaResponse {
118 description: "",
119 status: Some(200),
120 status_range: None,
121 content: vec![MetaMediaType {
122 content_type: Self::CONTENT_TYPE,
123 schema: Self::schema_ref(),
124 }],
125 headers: vec![],
126 }],
127 }
128 }
129
130 fn register(registry: &mut Registry) {
131 E::register(registry);
132 }
133}