pingora_core/protocols/http/
client.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use bytes::Bytes;
use pingora_error::Result;
use pingora_http::{RequestHeader, ResponseHeader};
use std::time::Duration;

use super::v1::client::HttpSession as Http1Session;
use super::v2::client::Http2Session;
use crate::protocols::{Digest, SocketAddr, Stream};

/// A type for Http client session. It can be either an Http1 connection or an Http2 stream.
pub enum HttpSession {
    H1(Http1Session),
    H2(Http2Session),
}

impl HttpSession {
    pub fn as_http1(&self) -> Option<&Http1Session> {
        match self {
            Self::H1(s) => Some(s),
            Self::H2(_) => None,
        }
    }

    pub fn as_http2(&self) -> Option<&Http2Session> {
        match self {
            Self::H1(_) => None,
            Self::H2(s) => Some(s),
        }
    }
    /// Write the request header to the server
    /// After the request header is sent. The caller can either start reading the response or
    /// sending request body if any.
    pub async fn write_request_header(&mut self, req: Box<RequestHeader>) -> Result<()> {
        match self {
            HttpSession::H1(h1) => {
                h1.write_request_header(req).await?;
                Ok(())
            }
            HttpSession::H2(h2) => h2.write_request_header(req, false),
        }
    }

    /// Write a chunk of the request body.
    pub async fn write_request_body(&mut self, data: Bytes, end: bool) -> Result<()> {
        match self {
            HttpSession::H1(h1) => {
                // TODO: maybe h1 should also have the concept of `end`
                h1.write_body(&data).await?;
                Ok(())
            }
            HttpSession::H2(h2) => h2.write_request_body(data, end),
        }
    }

    /// Signal that the request body has ended
    pub async fn finish_request_body(&mut self) -> Result<()> {
        match self {
            HttpSession::H1(h1) => {
                h1.finish_body().await?;
                Ok(())
            }
            HttpSession::H2(h2) => h2.finish_request_body(),
        }
    }

    /// Set the read timeout for reading header and body.
    ///
    /// The timeout is per read operation, not on the overall time reading the entire response
    pub fn set_read_timeout(&mut self, timeout: Duration) {
        match self {
            HttpSession::H1(h1) => h1.read_timeout = Some(timeout),
            HttpSession::H2(h2) => h2.read_timeout = Some(timeout),
        }
    }

    /// Set the write timeout for writing header and body.
    ///
    /// The timeout is per write operation, not on the overall time writing the entire request.
    ///
    /// This is a noop for h2.
    pub fn set_write_timeout(&mut self, timeout: Duration) {
        match self {
            HttpSession::H1(h1) => h1.write_timeout = Some(timeout),
            HttpSession::H2(_) => { /* no write timeout because the actual write happens async*/ }
        }
    }

    /// Read the response header from the server
    /// For http1, this function can be called multiple times, if the headers received are just
    /// informational headers.
    pub async fn read_response_header(&mut self) -> Result<()> {
        match self {
            HttpSession::H1(h1) => {
                h1.read_response().await?;
                Ok(())
            }
            HttpSession::H2(h2) => h2.read_response_header().await,
        }
    }

    /// Read response body
    ///
    /// `None` when no more body to read.
    pub async fn read_response_body(&mut self) -> Result<Option<Bytes>> {
        match self {
            HttpSession::H1(h1) => h1.read_body_bytes().await,
            HttpSession::H2(h2) => h2.read_response_body().await,
        }
    }

    /// No (more) body to read
    pub fn response_done(&mut self) -> bool {
        match self {
            HttpSession::H1(h1) => h1.is_body_done(),
            HttpSession::H2(h2) => h2.response_finished(),
        }
    }

    /// Give up the http session abruptly.
    /// For H1 this will close the underlying connection
    /// For H2 this will send RST_STREAM frame to end this stream if the stream has not ended at all
    pub async fn shutdown(&mut self) {
        match self {
            Self::H1(s) => s.shutdown().await,
            Self::H2(s) => s.shutdown(),
        }
    }

    /// Get the response header of the server
    ///
    /// `None` if the response header is not read yet.
    pub fn response_header(&self) -> Option<&ResponseHeader> {
        match self {
            Self::H1(s) => s.resp_header(),
            Self::H2(s) => s.response_header(),
        }
    }

    /// Return the [Digest] of the connection
    ///
    /// For reused connection, the timing in the digest will reflect its initial handshakes
    /// The caller should check if the connection is reused to avoid misuse of the timing field.
    pub fn digest(&self) -> Option<&Digest> {
        match self {
            Self::H1(s) => Some(s.digest()),
            Self::H2(s) => s.digest(),
        }
    }

    /// Return a mutable [Digest] reference for the connection.
    ///
    /// Will return `None` if this is an H2 session and multiple streams are open.
    pub fn digest_mut(&mut self) -> Option<&mut Digest> {
        match self {
            Self::H1(s) => Some(s.digest_mut()),
            Self::H2(s) => s.digest_mut(),
        }
    }

    /// Return the server (peer) address of the connection.
    pub fn server_addr(&self) -> Option<&SocketAddr> {
        match self {
            Self::H1(s) => s.server_addr(),
            Self::H2(s) => s.server_addr(),
        }
    }

    /// Return the client (local) address of the connection.
    pub fn client_addr(&self) -> Option<&SocketAddr> {
        match self {
            Self::H1(s) => s.client_addr(),
            Self::H2(s) => s.client_addr(),
        }
    }

    /// Get the reference of the [Stream] that this HTTP/1 session is operating upon.
    /// None if the HTTP session is over H2
    pub fn stream(&self) -> Option<&Stream> {
        match self {
            Self::H1(s) => Some(s.stream()),
            Self::H2(_) => None,
        }
    }
}