kube_client/client/
upgrade.rs

1use http::{self, HeaderValue, Response, StatusCode};
2use thiserror::Error;
3use tokio_tungstenite::tungstenite as ws;
4
5use crate::{client::Body, Error, Result};
6
7#[derive(Debug)]
8pub enum StreamProtocol {
9    /// Binary subprotocol v4. See `Client::connect`.
10    V4,
11
12    /// Binary subprotocol v5. See `Client::connect`.
13    /// v5 supports CLOSE signals.
14    /// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L52C26-L52C43
15    V5,
16}
17
18impl StreamProtocol {
19    pub fn as_str(&self) -> &'static str {
20        match self {
21            Self::V4 => "v4.channel.k8s.io",
22            Self::V5 => "v5.channel.k8s.io",
23        }
24    }
25
26    fn as_bytes(&self) -> &'static [u8] {
27        self.as_str().as_bytes()
28    }
29
30    pub fn supports_stream_close(&self) -> bool {
31        match self {
32            Self::V4 => false,
33            Self::V5 => true,
34        }
35    }
36
37    /// Add HTTP header SEC_WEBSOCKET_PROTOCOL with a list of supported protocol.
38    pub fn add_to_headers(headers: &mut http::HeaderMap) -> Result<()> {
39        // Protocols we support in our preferred order.
40        let supported_protocols = [
41            // v5 supports CLOSE signals.
42            Self::V5.as_str(),
43            // Use the binary subprotocol v4, to get JSON `Status` object in `error` channel (3).
44            // There's no official documentation about this protocol, but it's described in
45            // [`k8s.io/apiserver/pkg/util/wsstream/conn.go`](https://git.io/JLQED).
46            // There's a comment about v4 and `Status` object in
47            // [`kublet/cri/streaming/remotecommand/httpstream.go`](https://git.io/JLQEh).
48            Self::V4.as_str(),
49        ];
50
51        let header_value_string = supported_protocols.join(", ");
52
53        // Note: Multiple headers does not work. Only a single CSV works.
54        headers.insert(
55            http::header::SEC_WEBSOCKET_PROTOCOL,
56            HeaderValue::from_str(&header_value_string).map_err(|e| Error::HttpError(e.into()))?,
57        );
58
59        Ok(())
60    }
61
62    /// Return the subprotocol of an HTTP response.
63    fn get_from_response<B>(res: &Response<B>) -> Option<Self> {
64        let headers = res.headers();
65
66        match headers
67            .get(http::header::SEC_WEBSOCKET_PROTOCOL)
68            .map(|h| h.as_bytes())
69        {
70            Some(protocol) => {
71                if protocol == Self::V4.as_bytes() {
72                    Some(Self::V4)
73                } else if protocol == Self::V5.as_bytes() {
74                    Some(Self::V5)
75                } else {
76                    None
77                }
78            }
79            _ => None,
80        }
81    }
82}
83
84/// Possible errors from upgrading to a WebSocket connection
85#[cfg(feature = "ws")]
86#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
87#[derive(Debug, Error)]
88pub enum UpgradeConnectionError {
89    /// The server did not respond with [`SWITCHING_PROTOCOLS`] status when upgrading the
90    /// connection.
91    ///
92    /// [`SWITCHING_PROTOCOLS`]: http::status::StatusCode::SWITCHING_PROTOCOLS
93    #[error("failed to switch protocol: {0}")]
94    ProtocolSwitch(http::status::StatusCode),
95
96    /// `Upgrade` header was not set to `websocket` (case insensitive)
97    #[error("upgrade header was not set to websocket")]
98    MissingUpgradeWebSocketHeader,
99
100    /// `Connection` header was not set to `Upgrade` (case insensitive)
101    #[error("connection header was not set to Upgrade")]
102    MissingConnectionUpgradeHeader,
103
104    /// `Sec-WebSocket-Accept` key mismatched.
105    #[error("Sec-WebSocket-Accept key mismatched")]
106    SecWebSocketAcceptKeyMismatch,
107
108    /// `Sec-WebSocket-Protocol` mismatched.
109    #[error("Sec-WebSocket-Protocol mismatched")]
110    SecWebSocketProtocolMismatch,
111
112    /// Failed to get pending HTTP upgrade.
113    #[error("failed to get pending HTTP upgrade: {0}")]
114    GetPendingUpgrade(#[source] hyper::Error),
115}
116
117// Verify upgrade response according to RFC6455.
118// Based on `tungstenite` and added subprotocol verification.
119pub fn verify_response(res: &Response<Body>, key: &str) -> Result<StreamProtocol, UpgradeConnectionError> {
120    if res.status() != StatusCode::SWITCHING_PROTOCOLS {
121        return Err(UpgradeConnectionError::ProtocolSwitch(res.status()));
122    }
123
124    let headers = res.headers();
125    if !headers
126        .get(http::header::UPGRADE)
127        .and_then(|h| h.to_str().ok())
128        .map(|h| h.eq_ignore_ascii_case("websocket"))
129        .unwrap_or(false)
130    {
131        return Err(UpgradeConnectionError::MissingUpgradeWebSocketHeader);
132    }
133
134    if !headers
135        .get(http::header::CONNECTION)
136        .and_then(|h| h.to_str().ok())
137        .map(|h| h.eq_ignore_ascii_case("Upgrade"))
138        .unwrap_or(false)
139    {
140        return Err(UpgradeConnectionError::MissingConnectionUpgradeHeader);
141    }
142
143    let accept_key = ws::handshake::derive_accept_key(key.as_ref());
144    if !headers
145        .get(http::header::SEC_WEBSOCKET_ACCEPT)
146        .map(|h| h == &accept_key)
147        .unwrap_or(false)
148    {
149        return Err(UpgradeConnectionError::SecWebSocketAcceptKeyMismatch);
150    }
151
152    // Make sure that the server returned an expected subprotocol.
153    let protocol = match StreamProtocol::get_from_response(res) {
154        Some(p) => p,
155        None => return Err(UpgradeConnectionError::SecWebSocketProtocolMismatch),
156    };
157
158    Ok(protocol)
159}