async_h1/server/
mod.rs

1//! Process HTTP connections on the server.
2
3use async_io::Timer;
4use futures_lite::io::{self, AsyncRead as Read, AsyncWrite as Write};
5use futures_lite::prelude::*;
6use http_types::headers::{CONNECTION, UPGRADE};
7use http_types::upgrade::Connection;
8use http_types::{Request, Response, StatusCode};
9use std::{future::Future, marker::PhantomData, time::Duration};
10mod body_reader;
11mod decode;
12mod encode;
13
14pub use decode::decode;
15pub use encode::Encoder;
16
17/// Configure the server.
18#[derive(Debug, Clone)]
19pub struct ServerOptions {
20    /// Timeout to handle headers. Defaults to 60s.
21    headers_timeout: Option<Duration>,
22}
23
24impl Default for ServerOptions {
25    fn default() -> Self {
26        Self {
27            headers_timeout: Some(Duration::from_secs(60)),
28        }
29    }
30}
31
32/// Accept a new incoming HTTP/1.1 connection.
33///
34/// Supports `KeepAlive` requests by default.
35pub async fn accept<RW, F, Fut>(io: RW, endpoint: F) -> http_types::Result<()>
36where
37    RW: Read + Write + Clone + Send + Sync + Unpin + 'static,
38    F: Fn(Request) -> Fut,
39    Fut: Future<Output = http_types::Result<Response>>,
40{
41    Server::new(io, endpoint).accept().await
42}
43
44/// Accept a new incoming HTTP/1.1 connection.
45///
46/// Supports `KeepAlive` requests by default.
47pub async fn accept_with_opts<RW, F, Fut>(
48    io: RW,
49    endpoint: F,
50    opts: ServerOptions,
51) -> http_types::Result<()>
52where
53    RW: Read + Write + Clone + Send + Sync + Unpin + 'static,
54    F: Fn(Request) -> Fut,
55    Fut: Future<Output = http_types::Result<Response>>,
56{
57    Server::new(io, endpoint).with_opts(opts).accept().await
58}
59
60/// struct for server
61#[derive(Debug)]
62pub struct Server<RW, F, Fut> {
63    io: RW,
64    endpoint: F,
65    opts: ServerOptions,
66    _phantom: PhantomData<Fut>,
67}
68
69/// An enum that represents whether the server should accept a subsequent request
70#[derive(Debug, Copy, Clone, Eq, PartialEq)]
71pub enum ConnectionStatus {
72    /// The server should not accept another request
73    Close,
74
75    /// The server may accept another request
76    KeepAlive,
77}
78
79impl<RW, F, Fut> Server<RW, F, Fut>
80where
81    RW: Read + Write + Clone + Send + Sync + Unpin + 'static,
82    F: Fn(Request) -> Fut,
83    Fut: Future<Output = http_types::Result<Response>>,
84{
85    /// builds a new server
86    pub fn new(io: RW, endpoint: F) -> Self {
87        Self {
88            io,
89            endpoint,
90            opts: Default::default(),
91            _phantom: PhantomData,
92        }
93    }
94
95    /// with opts
96    pub fn with_opts(mut self, opts: ServerOptions) -> Self {
97        self.opts = opts;
98        self
99    }
100
101    /// accept in a loop
102    pub async fn accept(&mut self) -> http_types::Result<()> {
103        while ConnectionStatus::KeepAlive == self.accept_one().await? {}
104        Ok(())
105    }
106
107    /// accept one request
108    pub async fn accept_one(&mut self) -> http_types::Result<ConnectionStatus>
109    where
110        RW: Read + Write + Clone + Send + Sync + Unpin + 'static,
111        F: Fn(Request) -> Fut,
112        Fut: Future<Output = http_types::Result<Response>>,
113    {
114        // Decode a new request, timing out if this takes longer than the timeout duration.
115        let fut = decode(self.io.clone());
116
117        let (req, mut body) = if let Some(timeout_duration) = self.opts.headers_timeout {
118            match fut
119                .or(async {
120                    Timer::after(timeout_duration).await;
121                    Ok(None)
122                })
123                .await
124            {
125                Ok(Some(r)) => r,
126                Ok(None) => return Ok(ConnectionStatus::Close), /* EOF or timeout */
127                Err(e) => return Err(e),
128            }
129        } else {
130            match fut.await? {
131                Some(r) => r,
132                None => return Ok(ConnectionStatus::Close), /* EOF */
133            }
134        };
135
136        let has_upgrade_header = req.header(UPGRADE).is_some();
137        let connection_header_as_str = req
138            .header(CONNECTION)
139            .map(|connection| connection.as_str())
140            .unwrap_or("");
141
142        let connection_header_is_upgrade = connection_header_as_str
143            .split(',')
144            .any(|s| s.trim().eq_ignore_ascii_case("upgrade"));
145        let mut close_connection = connection_header_as_str.eq_ignore_ascii_case("close");
146
147        let upgrade_requested = has_upgrade_header && connection_header_is_upgrade;
148
149        let method = req.method();
150
151        // Pass the request to the endpoint and encode the response.
152        let mut res = (self.endpoint)(req).await?;
153
154        close_connection |= res
155            .header(CONNECTION)
156            .map(|c| c.as_str().eq_ignore_ascii_case("close"))
157            .unwrap_or(false);
158
159        let upgrade_provided = res.status() == StatusCode::SwitchingProtocols && res.has_upgrade();
160
161        let upgrade_sender = if upgrade_requested && upgrade_provided {
162            Some(res.send_upgrade())
163        } else {
164            None
165        };
166
167        let mut encoder = Encoder::new(res, method);
168
169        let bytes_written = io::copy(&mut encoder, &mut self.io).await?;
170        log::trace!("wrote {} response bytes", bytes_written);
171
172        let body_bytes_discarded = io::copy(&mut body, &mut io::sink()).await?;
173        log::trace!(
174            "discarded {} unread request body bytes",
175            body_bytes_discarded
176        );
177
178        if let Some(upgrade_sender) = upgrade_sender {
179            upgrade_sender.send(Connection::new(self.io.clone())).await;
180            Ok(ConnectionStatus::Close)
181        } else if close_connection {
182            Ok(ConnectionStatus::Close)
183        } else {
184            Ok(ConnectionStatus::KeepAlive)
185        }
186    }
187}