async_graphql/http/
multipart_subscribe.rs1use std::time::Duration;
2
3use bytes::{BufMut, Bytes, BytesMut};
4use futures_timer::Delay;
5use futures_util::{stream::BoxStream, FutureExt, Stream, StreamExt};
6use mime::Mime;
7
8use crate::Response;
9
10static PART_HEADER: Bytes =
11 Bytes::from_static(b"--graphql\r\nContent-Type: application/json\r\n\r\n");
12static EOF: Bytes = Bytes::from_static(b"--graphql--\r\n");
13static CRLF: Bytes = Bytes::from_static(b"\r\n");
14static HEARTBEAT: Bytes = Bytes::from_static(b"{}\r\n");
15
16pub fn create_multipart_mixed_stream<'a>(
20 input: impl Stream<Item = Response> + Send + Unpin + 'a,
21 heartbeat_interval: Duration,
22) -> BoxStream<'a, Bytes> {
23 let mut input = input.fuse();
24 let mut heartbeat_timer = Delay::new(heartbeat_interval).fuse();
25
26 async_stream::stream! {
27 loop {
28 futures_util::select! {
29 item = input.next() => {
30 match item {
31 Some(resp) => {
32 let data = BytesMut::new();
33 let mut writer = data.writer();
34 if serde_json::to_writer(&mut writer, &resp).is_err() {
35 continue;
36 }
37
38 yield PART_HEADER.clone();
39 yield writer.into_inner().freeze();
40 yield CRLF.clone();
41 }
42 None => break,
43 }
44 }
45 _ = heartbeat_timer => {
46 heartbeat_timer = Delay::new(heartbeat_interval).fuse();
47 yield PART_HEADER.clone();
48 yield HEARTBEAT.clone();
49 }
50 }
51 }
52
53 yield EOF.clone();
54 }
55 .boxed()
56}
57
58fn parse_accept(accept: &str) -> Vec<Mime> {
59 let mut items = accept
60 .split(',')
61 .map(str::trim)
62 .filter_map(|item| {
63 let mime: Mime = item.parse().ok()?;
64 let q = mime
65 .get_param("q")
66 .and_then(|value| Some((value.as_str().parse::<f32>().ok()? * 1000.0) as i32))
67 .unwrap_or(1000);
68 Some((mime, q))
69 })
70 .collect::<Vec<_>>();
71 items.sort_by(|(_, qa), (_, qb)| qb.cmp(qa));
72 items.into_iter().map(|(mime, _)| mime).collect()
73}
74
75pub fn is_accept_multipart_mixed(accept: &str) -> bool {
88 for mime in parse_accept(accept) {
89 if mime.type_() == mime::APPLICATION && mime.subtype() == mime::JSON {
90 return false;
91 }
92
93 if mime.type_() == mime::MULTIPART
94 && mime.subtype() == "mixed"
95 && mime.get_param(mime::BOUNDARY).map(|value| value.as_str()) == Some("graphql")
96 && mime
97 .get_param("subscriptionSpec")
98 .map(|value| value.as_str())
99 == Some("1.0")
100 {
101 return true;
102 }
103 }
104
105 false
106}