kube_client/client/
upgrade.rs1use http::{self, HeaderValue, Response, StatusCode};
2use thiserror::Error;
3use tokio_tungstenite::tungstenite as ws;
4
5use crate::{client::Body, Error, Result};
6
7#[derive(Debug)]
8pub enum StreamProtocol {
9 V4,
11
12 V5,
16}
17
18impl StreamProtocol {
19 pub fn as_str(&self) -> &'static str {
20 match self {
21 Self::V4 => "v4.channel.k8s.io",
22 Self::V5 => "v5.channel.k8s.io",
23 }
24 }
25
26 fn as_bytes(&self) -> &'static [u8] {
27 self.as_str().as_bytes()
28 }
29
30 pub fn supports_stream_close(&self) -> bool {
31 match self {
32 Self::V4 => false,
33 Self::V5 => true,
34 }
35 }
36
37 pub fn add_to_headers(headers: &mut http::HeaderMap) -> Result<()> {
39 let supported_protocols = [
41 Self::V5.as_str(),
43 Self::V4.as_str(),
49 ];
50
51 let header_value_string = supported_protocols.join(", ");
52
53 headers.insert(
55 http::header::SEC_WEBSOCKET_PROTOCOL,
56 HeaderValue::from_str(&header_value_string).map_err(|e| Error::HttpError(e.into()))?,
57 );
58
59 Ok(())
60 }
61
62 fn get_from_response<B>(res: &Response<B>) -> Option<Self> {
64 let headers = res.headers();
65
66 match headers
67 .get(http::header::SEC_WEBSOCKET_PROTOCOL)
68 .map(|h| h.as_bytes())
69 {
70 Some(protocol) => {
71 if protocol == Self::V4.as_bytes() {
72 Some(Self::V4)
73 } else if protocol == Self::V5.as_bytes() {
74 Some(Self::V5)
75 } else {
76 None
77 }
78 }
79 _ => None,
80 }
81 }
82}
83
84#[cfg(feature = "ws")]
86#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
87#[derive(Debug, Error)]
88pub enum UpgradeConnectionError {
89 #[error("failed to switch protocol: {0}")]
94 ProtocolSwitch(http::status::StatusCode),
95
96 #[error("upgrade header was not set to websocket")]
98 MissingUpgradeWebSocketHeader,
99
100 #[error("connection header was not set to Upgrade")]
102 MissingConnectionUpgradeHeader,
103
104 #[error("Sec-WebSocket-Accept key mismatched")]
106 SecWebSocketAcceptKeyMismatch,
107
108 #[error("Sec-WebSocket-Protocol mismatched")]
110 SecWebSocketProtocolMismatch,
111
112 #[error("failed to get pending HTTP upgrade: {0}")]
114 GetPendingUpgrade(#[source] hyper::Error),
115}
116
117pub fn verify_response(res: &Response<Body>, key: &str) -> Result<StreamProtocol, UpgradeConnectionError> {
120 if res.status() != StatusCode::SWITCHING_PROTOCOLS {
121 return Err(UpgradeConnectionError::ProtocolSwitch(res.status()));
122 }
123
124 let headers = res.headers();
125 if !headers
126 .get(http::header::UPGRADE)
127 .and_then(|h| h.to_str().ok())
128 .map(|h| h.eq_ignore_ascii_case("websocket"))
129 .unwrap_or(false)
130 {
131 return Err(UpgradeConnectionError::MissingUpgradeWebSocketHeader);
132 }
133
134 if !headers
135 .get(http::header::CONNECTION)
136 .and_then(|h| h.to_str().ok())
137 .map(|h| h.eq_ignore_ascii_case("Upgrade"))
138 .unwrap_or(false)
139 {
140 return Err(UpgradeConnectionError::MissingConnectionUpgradeHeader);
141 }
142
143 let accept_key = ws::handshake::derive_accept_key(key.as_ref());
144 if !headers
145 .get(http::header::SEC_WEBSOCKET_ACCEPT)
146 .map(|h| h == &accept_key)
147 .unwrap_or(false)
148 {
149 return Err(UpgradeConnectionError::SecWebSocketAcceptKeyMismatch);
150 }
151
152 let protocol = match StreamProtocol::get_from_response(res) {
154 Some(p) => p,
155 None => return Err(UpgradeConnectionError::SecWebSocketProtocolMismatch),
156 };
157
158 Ok(protocol)
159}