poem_openapi/payload/
event_stream.rs

1use std::time::Duration;
2
3use futures_util::{Stream, StreamExt};
4use poem::{
5    web::sse::{Event, SSE},
6    IntoResponse, Response,
7};
8
9use crate::{
10    payload::Payload,
11    registry::{MetaMediaType, MetaResponse, MetaResponses, MetaSchema, MetaSchemaRef, Registry},
12    types::{ToJSON, Type},
13    ApiResponse,
14};
15
16type ToEventFn<T> = Box<dyn (FnMut(T) -> Event) + Send + 'static>;
17
18/// An event stream payload.
19///
20/// Reference: <https://github.com/OAI/OpenAPI-Specification/issues/396#issuecomment-894718960>
21pub struct EventStream<T: Stream + Send + 'static> {
22    stream: T,
23    keep_alive: Option<Duration>,
24    to_event: Option<ToEventFn<T::Item>>,
25}
26
27impl<T: Stream + Send + 'static> EventStream<T> {
28    /// Create an event stream payload.
29    pub fn new(stream: T) -> Self {
30        Self {
31            stream,
32            keep_alive: None,
33            to_event: None,
34        }
35    }
36
37    /// Set the keep alive interval.
38    #[must_use]
39    pub fn keep_alive(self, duration: Duration) -> Self {
40        Self {
41            keep_alive: Some(duration),
42            ..self
43        }
44    }
45
46    /// Set a function used to convert the message to SSE event.
47    ///
48    /// # Examples
49    ///
50    /// ```rust
51    /// use poem::web::sse::Event;
52    /// use poem_openapi::{payload::EventStream, types::ToJSON, Object};
53    ///
54    /// #[derive(Debug, Object)]
55    /// struct MyEvent {
56    ///     value: i32,
57    /// }
58    ///
59    /// EventStream::new(futures_util::stream::iter(vec![
60    ///     MyEvent { value: 1 },
61    ///     MyEvent { value: 2 },
62    ///     MyEvent { value: 3 },
63    /// ]))
64    /// .to_event(|event| {
65    ///     let json = event.to_json_string();
66    ///     Event::message(json).event_type("push")
67    /// });
68    /// ```
69    #[must_use]
70    pub fn to_event(self, f: impl FnMut(T::Item) -> Event + Send + 'static) -> Self {
71        Self {
72            to_event: Some(Box::new(f)),
73            ..self
74        }
75    }
76}
77
78impl<T: Stream<Item = E> + Send + 'static, E: Type + ToJSON> Payload for EventStream<T> {
79    const CONTENT_TYPE: &'static str = "text/event-stream";
80
81    fn schema_ref() -> MetaSchemaRef {
82        MetaSchemaRef::Inline(Box::new(MetaSchema {
83            items: Some(Box::new(E::schema_ref())),
84            ..MetaSchema::new_with_format("array", "event-stream")
85        }))
86    }
87
88    fn register(registry: &mut Registry) {
89        E::register(registry);
90    }
91}
92
93impl<T: Stream<Item = E> + Send + 'static, E: Type + ToJSON + 'static> IntoResponse
94    for EventStream<T>
95{
96    fn into_response(self) -> Response {
97        let mut sse = match self.to_event {
98            Some(to_event) => SSE::new(self.stream.map(to_event)),
99            None => SSE::new(
100                self.stream
101                    .map(|message| message.to_json_string())
102                    .map(Event::message),
103            ),
104        };
105
106        if let Some(keep_alive) = self.keep_alive {
107            sse = sse.keep_alive(keep_alive);
108        }
109
110        sse.into_response()
111    }
112}
113
114impl<T: Stream<Item = E> + Send + 'static, E: Type + ToJSON> ApiResponse for EventStream<T> {
115    fn meta() -> MetaResponses {
116        MetaResponses {
117            responses: vec![MetaResponse {
118                description: "",
119                status: Some(200),
120                status_range: None,
121                content: vec![MetaMediaType {
122                    content_type: Self::CONTENT_TYPE,
123                    schema: Self::schema_ref(),
124                }],
125                headers: vec![],
126            }],
127        }
128    }
129
130    fn register(registry: &mut Registry) {
131        E::register(registry);
132    }
133}