1use async_io::Timer;
4use futures_lite::io::{self, AsyncRead as Read, AsyncWrite as Write};
5use futures_lite::prelude::*;
6use http_types::headers::{CONNECTION, UPGRADE};
7use http_types::upgrade::Connection;
8use http_types::{Request, Response, StatusCode};
9use std::{future::Future, marker::PhantomData, time::Duration};
10mod body_reader;
11mod decode;
12mod encode;
13
14pub use decode::decode;
15pub use encode::Encoder;
16
17#[derive(Debug, Clone)]
19pub struct ServerOptions {
20 headers_timeout: Option<Duration>,
22}
23
24impl Default for ServerOptions {
25 fn default() -> Self {
26 Self {
27 headers_timeout: Some(Duration::from_secs(60)),
28 }
29 }
30}
31
32pub async fn accept<RW, F, Fut>(io: RW, endpoint: F) -> http_types::Result<()>
36where
37 RW: Read + Write + Clone + Send + Sync + Unpin + 'static,
38 F: Fn(Request) -> Fut,
39 Fut: Future<Output = http_types::Result<Response>>,
40{
41 Server::new(io, endpoint).accept().await
42}
43
44pub async fn accept_with_opts<RW, F, Fut>(
48 io: RW,
49 endpoint: F,
50 opts: ServerOptions,
51) -> http_types::Result<()>
52where
53 RW: Read + Write + Clone + Send + Sync + Unpin + 'static,
54 F: Fn(Request) -> Fut,
55 Fut: Future<Output = http_types::Result<Response>>,
56{
57 Server::new(io, endpoint).with_opts(opts).accept().await
58}
59
60#[derive(Debug)]
62pub struct Server<RW, F, Fut> {
63 io: RW,
64 endpoint: F,
65 opts: ServerOptions,
66 _phantom: PhantomData<Fut>,
67}
68
69#[derive(Debug, Copy, Clone, Eq, PartialEq)]
71pub enum ConnectionStatus {
72 Close,
74
75 KeepAlive,
77}
78
79impl<RW, F, Fut> Server<RW, F, Fut>
80where
81 RW: Read + Write + Clone + Send + Sync + Unpin + 'static,
82 F: Fn(Request) -> Fut,
83 Fut: Future<Output = http_types::Result<Response>>,
84{
85 pub fn new(io: RW, endpoint: F) -> Self {
87 Self {
88 io,
89 endpoint,
90 opts: Default::default(),
91 _phantom: PhantomData,
92 }
93 }
94
95 pub fn with_opts(mut self, opts: ServerOptions) -> Self {
97 self.opts = opts;
98 self
99 }
100
101 pub async fn accept(&mut self) -> http_types::Result<()> {
103 while ConnectionStatus::KeepAlive == self.accept_one().await? {}
104 Ok(())
105 }
106
107 pub async fn accept_one(&mut self) -> http_types::Result<ConnectionStatus>
109 where
110 RW: Read + Write + Clone + Send + Sync + Unpin + 'static,
111 F: Fn(Request) -> Fut,
112 Fut: Future<Output = http_types::Result<Response>>,
113 {
114 let fut = decode(self.io.clone());
116
117 let (req, mut body) = if let Some(timeout_duration) = self.opts.headers_timeout {
118 match fut
119 .or(async {
120 Timer::after(timeout_duration).await;
121 Ok(None)
122 })
123 .await
124 {
125 Ok(Some(r)) => r,
126 Ok(None) => return Ok(ConnectionStatus::Close), Err(e) => return Err(e),
128 }
129 } else {
130 match fut.await? {
131 Some(r) => r,
132 None => return Ok(ConnectionStatus::Close), }
134 };
135
136 let has_upgrade_header = req.header(UPGRADE).is_some();
137 let connection_header_as_str = req
138 .header(CONNECTION)
139 .map(|connection| connection.as_str())
140 .unwrap_or("");
141
142 let connection_header_is_upgrade = connection_header_as_str
143 .split(',')
144 .any(|s| s.trim().eq_ignore_ascii_case("upgrade"));
145 let mut close_connection = connection_header_as_str.eq_ignore_ascii_case("close");
146
147 let upgrade_requested = has_upgrade_header && connection_header_is_upgrade;
148
149 let method = req.method();
150
151 let mut res = (self.endpoint)(req).await?;
153
154 close_connection |= res
155 .header(CONNECTION)
156 .map(|c| c.as_str().eq_ignore_ascii_case("close"))
157 .unwrap_or(false);
158
159 let upgrade_provided = res.status() == StatusCode::SwitchingProtocols && res.has_upgrade();
160
161 let upgrade_sender = if upgrade_requested && upgrade_provided {
162 Some(res.send_upgrade())
163 } else {
164 None
165 };
166
167 let mut encoder = Encoder::new(res, method);
168
169 let bytes_written = io::copy(&mut encoder, &mut self.io).await?;
170 log::trace!("wrote {} response bytes", bytes_written);
171
172 let body_bytes_discarded = io::copy(&mut body, &mut io::sink()).await?;
173 log::trace!(
174 "discarded {} unread request body bytes",
175 body_bytes_discarded
176 );
177
178 if let Some(upgrade_sender) = upgrade_sender {
179 upgrade_sender.send(Connection::new(self.io.clone())).await;
180 Ok(ConnectionStatus::Close)
181 } else if close_connection {
182 Ok(ConnectionStatus::Close)
183 } else {
184 Ok(ConnectionStatus::KeepAlive)
185 }
186 }
187}