pingora_core/apps/
mod.rs

1// Copyright 2024 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The abstraction and implementation interface for service application logic
16
17pub mod http_app;
18pub mod prometheus_http_app;
19
20use crate::server::ShutdownWatch;
21use async_trait::async_trait;
22use log::{debug, error};
23use std::future::poll_fn;
24use std::sync::Arc;
25
26use crate::protocols::http::v2::server;
27use crate::protocols::http::ServerSession;
28use crate::protocols::Digest;
29use crate::protocols::Stream;
30use crate::protocols::ALPN;
31
32// https://datatracker.ietf.org/doc/html/rfc9113#section-3.4
33const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
34
35#[async_trait]
36/// This trait defines the interface of a transport layer (TCP or TLS) application.
37pub trait ServerApp {
38    /// Whenever a new connection is established, this function will be called with the established
39    /// [`Stream`] object provided.
40    ///
41    /// The application can do whatever it wants with the `session`.
42    ///
43    /// After processing the `session`, if the `session`'s connection is reusable, This function
44    /// can return it to the service by returning `Some(session)`. The returned `session` will be
45    /// fed to another [`Self::process_new()`] for another round of processing.
46    /// If not reusable, `None` should be returned.
47    ///
48    /// The `shutdown` argument will change from `false` to `true` when the server receives a
49    /// signal to shutdown. This argument allows the application to react accordingly.
50    async fn process_new(
51        self: &Arc<Self>,
52        mut session: Stream,
53        // TODO: make this ShutdownWatch so that all task can await on this event
54        shutdown: &ShutdownWatch,
55    ) -> Option<Stream>;
56
57    /// This callback will be called once after the service stops listening to its endpoints.
58    async fn cleanup(&self) {}
59}
60#[non_exhaustive]
61#[derive(Default)]
62/// HTTP Server options that control how the server handles some transport types.
63pub struct HttpServerOptions {
64    /// Use HTTP/2 for plaintext.
65    pub h2c: bool,
66}
67
68/// This trait defines the interface of an HTTP application.
69#[async_trait]
70pub trait HttpServerApp {
71    /// Similar to the [`ServerApp`], this function is called whenever a new HTTP session is established.
72    ///
73    /// After successful processing, [`ServerSession::finish()`] can be called to return an optionally reusable
74    /// connection back to the service. The caller needs to make sure that the connection is in a reusable state
75    /// i.e., no error or incomplete read or write headers or bodies. Otherwise a `None` should be returned.
76    async fn process_new_http(
77        self: &Arc<Self>,
78        mut session: ServerSession,
79        // TODO: make this ShutdownWatch so that all task can await on this event
80        shutdown: &ShutdownWatch,
81    ) -> Option<Stream>;
82
83    /// Provide options on how HTTP/2 connection should be established. This function will be called
84    /// every time a new HTTP/2 **connection** needs to be established.
85    ///
86    /// A `None` means to use the built-in default options. See [`server::H2Options`] for more details.
87    fn h2_options(&self) -> Option<server::H2Options> {
88        None
89    }
90
91    /// Provide HTTP server options used to override default behavior. This function will be called
92    /// every time a new connection is processed.
93    ///
94    /// A `None` means no server options will be applied.
95    fn server_options(&self) -> Option<&HttpServerOptions> {
96        None
97    }
98
99    async fn http_cleanup(&self) {}
100}
101
102#[async_trait]
103impl<T> ServerApp for T
104where
105    T: HttpServerApp + Send + Sync + 'static,
106{
107    async fn process_new(
108        self: &Arc<Self>,
109        mut stream: Stream,
110        shutdown: &ShutdownWatch,
111    ) -> Option<Stream> {
112        let mut h2c = self.server_options().as_ref().map_or(false, |o| o.h2c);
113
114        // try to read h2 preface
115        if h2c {
116            let mut buf = [0u8; H2_PREFACE.len()];
117            let peeked = stream
118                .try_peek(&mut buf)
119                .await
120                .map_err(|e| {
121                    // this error is normal when h1 reuse and close the connection
122                    debug!("Read error while peeking h2c preface {e}");
123                    e
124                })
125                .ok()?;
126            // not all streams support peeking
127            if peeked {
128                // turn off h2c (use h1) if h2 preface doesn't exist
129                h2c = buf == H2_PREFACE;
130            }
131        }
132        if h2c || matches!(stream.selected_alpn_proto(), Some(ALPN::H2)) {
133            // create a shared connection digest
134            let digest = Arc::new(Digest {
135                ssl_digest: stream.get_ssl_digest(),
136                // TODO: log h2 handshake time
137                timing_digest: stream.get_timing_digest(),
138                proxy_digest: stream.get_proxy_digest(),
139                socket_digest: stream.get_socket_digest(),
140            });
141
142            let h2_options = self.h2_options();
143            let h2_conn = server::handshake(stream, h2_options).await;
144            let mut h2_conn = match h2_conn {
145                Err(e) => {
146                    error!("H2 handshake error {e}");
147                    return None;
148                }
149                Ok(c) => c,
150            };
151
152            let mut shutdown = shutdown.clone();
153            loop {
154                // this loop ends when the client decides to close the h2 conn
155                // TODO: add a timeout?
156                let h2_stream = tokio::select! {
157                    _ = shutdown.changed() => {
158                        h2_conn.graceful_shutdown();
159                        let _ = poll_fn(|cx| h2_conn.poll_closed(cx))
160                            .await.map_err(|e| error!("H2 error waiting for shutdown {e}"));
161                        return None;
162                    }
163                    h2_stream = server::HttpSession::from_h2_conn(&mut h2_conn, digest.clone()) => h2_stream
164                };
165                let h2_stream = match h2_stream {
166                    Err(e) => {
167                        // It is common for the client to just disconnect TCP without properly
168                        // closing H2. So we don't log the errors here
169                        debug!("H2 error when accepting new stream {e}");
170                        return None;
171                    }
172                    Ok(s) => s?, // None means the connection is ready to be closed
173                };
174                let app = self.clone();
175                let shutdown = shutdown.clone();
176                pingora_runtime::current_handle().spawn(async move {
177                    app.process_new_http(ServerSession::new_http2(h2_stream), &shutdown)
178                        .await;
179                });
180            }
181        } else {
182            // No ALPN or ALPN::H1 and h2c was not configured, fallback to HTTP/1.1
183            self.process_new_http(ServerSession::new_http1(stream), shutdown)
184                .await
185        }
186    }
187
188    async fn cleanup(&self) {
189        self.http_cleanup().await;
190    }
191}