async_graphql/http/
multipart_subscribe.rs

1use std::time::Duration;
2
3use bytes::{BufMut, Bytes, BytesMut};
4use futures_timer::Delay;
5use futures_util::{stream::BoxStream, FutureExt, Stream, StreamExt};
6use mime::Mime;
7
8use crate::Response;
9
10static PART_HEADER: Bytes =
11    Bytes::from_static(b"--graphql\r\nContent-Type: application/json\r\n\r\n");
12static EOF: Bytes = Bytes::from_static(b"--graphql--\r\n");
13static CRLF: Bytes = Bytes::from_static(b"\r\n");
14static HEARTBEAT: Bytes = Bytes::from_static(b"{}\r\n");
15
16/// Create a stream for `multipart/mixed` responses.
17///
18/// Reference: <https://www.apollographql.com/docs/router/executing-operations/subscription-multipart-protocol/>
19pub fn create_multipart_mixed_stream<'a>(
20    input: impl Stream<Item = Response> + Send + Unpin + 'a,
21    heartbeat_interval: Duration,
22) -> BoxStream<'a, Bytes> {
23    let mut input = input.fuse();
24    let mut heartbeat_timer = Delay::new(heartbeat_interval).fuse();
25
26    async_stream::stream! {
27        loop {
28            futures_util::select! {
29                item = input.next() => {
30                    match item {
31                        Some(resp) => {
32                            let data = BytesMut::new();
33                            let mut writer = data.writer();
34                            if serde_json::to_writer(&mut writer, &resp).is_err() {
35                                continue;
36                            }
37
38                            yield PART_HEADER.clone();
39                            yield writer.into_inner().freeze();
40                            yield CRLF.clone();
41                        }
42                        None => break,
43                    }
44                }
45                _ = heartbeat_timer => {
46                    heartbeat_timer = Delay::new(heartbeat_interval).fuse();
47                    yield PART_HEADER.clone();
48                    yield HEARTBEAT.clone();
49                }
50            }
51        }
52
53        yield EOF.clone();
54    }
55    .boxed()
56}
57
58fn parse_accept(accept: &str) -> Vec<Mime> {
59    let mut items = accept
60        .split(',')
61        .map(str::trim)
62        .filter_map(|item| {
63            let mime: Mime = item.parse().ok()?;
64            let q = mime
65                .get_param("q")
66                .and_then(|value| Some((value.as_str().parse::<f32>().ok()? * 1000.0) as i32))
67                .unwrap_or(1000);
68            Some((mime, q))
69        })
70        .collect::<Vec<_>>();
71    items.sort_by(|(_, qa), (_, qb)| qb.cmp(qa));
72    items.into_iter().map(|(mime, _)| mime).collect()
73}
74
75/// Check accept is multipart-mixed
76///
77/// # Example header
78///
79/// ```text
80/// Accept: multipart/mixed; boundary="graphql"; subscriptionSpec="1.0"
81/// ```
82///
83/// the value for boundary should always be `graphql`, and the value
84/// for `subscriptionSpec` should always be `1.0`.
85///
86/// Reference: <https://www.apollographql.com/docs/router/executing-operations/subscription-multipart-protocol/>
87pub fn is_accept_multipart_mixed(accept: &str) -> bool {
88    for mime in parse_accept(accept) {
89        if mime.type_() == mime::APPLICATION && mime.subtype() == mime::JSON {
90            return false;
91        }
92
93        if mime.type_() == mime::MULTIPART
94            && mime.subtype() == "mixed"
95            && mime.get_param(mime::BOUNDARY).map(|value| value.as_str()) == Some("graphql")
96            && mime
97                .get_param("subscriptionSpec")
98                .map(|value| value.as_str())
99                == Some("1.0")
100        {
101            return true;
102        }
103    }
104
105    false
106}