kube_client/api/
remote_command.rs

1use std::future::Future;
2
3use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status;
4
5use futures::{
6    channel::{mpsc, oneshot},
7    FutureExt, SinkExt, StreamExt,
8};
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11use tokio::{
12    io::{AsyncRead, AsyncWrite, AsyncWriteExt, DuplexStream},
13    select,
14};
15use tokio_tungstenite::tungstenite as ws;
16
17use crate::client::Connection;
18
19use super::AttachParams;
20
21type StatusReceiver = oneshot::Receiver<Status>;
22type StatusSender = oneshot::Sender<Status>;
23
24type TerminalSizeReceiver = mpsc::Receiver<TerminalSize>;
25type TerminalSizeSender = mpsc::Sender<TerminalSize>;
26
27/// TerminalSize define the size of a terminal
28#[derive(Debug, Serialize, Deserialize)]
29#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
30#[serde(rename_all = "PascalCase")]
31pub struct TerminalSize {
32    /// width of the terminal
33    pub width: u16,
34    /// height of the terminal
35    pub height: u16,
36}
37
38/// Errors from attaching to a pod.
39#[derive(Debug, Error)]
40pub enum Error {
41    /// Failed to read from stdin
42    #[error("failed to read from stdin: {0}")]
43    ReadStdin(#[source] std::io::Error),
44
45    /// Failed to send stdin data to the pod
46    #[error("failed to send a stdin data: {0}")]
47    SendStdin(#[source] ws::Error),
48
49    /// Failed to write to stdout
50    #[error("failed to write to stdout: {0}")]
51    WriteStdout(#[source] std::io::Error),
52
53    /// Failed to write to stderr
54    #[error("failed to write to stderr: {0}")]
55    WriteStderr(#[source] std::io::Error),
56
57    /// Failed to receive a WebSocket message from the server.
58    #[error("failed to receive a WebSocket message: {0}")]
59    ReceiveWebSocketMessage(#[source] ws::Error),
60
61    // Failed to complete the background task
62    #[error("failed to complete the background task: {0}")]
63    Spawn(#[source] tokio::task::JoinError),
64
65    /// Failed to send close message.
66    #[error("failed to send a WebSocket close message: {0}")]
67    SendClose(#[source] ws::Error),
68
69    /// Failed to deserialize status object
70    #[error("failed to deserialize status object: {0}")]
71    DeserializeStatus(#[source] serde_json::Error),
72
73    /// Failed to send status object
74    #[error("failed to send status object")]
75    SendStatus,
76
77    /// Fail to serialize Terminalsize object
78    #[error("failed to serialize TerminalSize object: {0}")]
79    SerializeTerminalSize(#[source] serde_json::Error),
80
81    /// Fail to send terminal size message
82    #[error("failed to send terminal size message")]
83    SendTerminalSize(#[source] ws::Error),
84
85    /// Failed to set terminal size, tty need to be true to resize the terminal
86    #[error("failed to set terminal size, tty need to be true to resize the terminal")]
87    TtyNeedToBeTrue,
88}
89
90const MAX_BUF_SIZE: usize = 1024;
91
92/// Represents an attached process in a container for [`attach`] and [`exec`].
93///
94/// Provides access to `stdin`, `stdout`, and `stderr` if attached.
95///
96/// Use [`AttachedProcess::join`] to wait for the process to terminate.
97///
98/// [`attach`]: crate::Api::attach
99/// [`exec`]: crate::Api::exec
100#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
101pub struct AttachedProcess {
102    has_stdin: bool,
103    has_stdout: bool,
104    has_stderr: bool,
105    stdin_writer: Option<DuplexStream>,
106    stdout_reader: Option<DuplexStream>,
107    stderr_reader: Option<DuplexStream>,
108    status_rx: Option<StatusReceiver>,
109    terminal_resize_tx: Option<TerminalSizeSender>,
110    task: tokio::task::JoinHandle<Result<(), Error>>,
111}
112
113impl AttachedProcess {
114    pub(crate) fn new(connection: Connection, ap: &AttachParams) -> Self {
115        // To simplify the implementation, always create a pipe for stdin.
116        // The caller does not have access to it unless they had requested.
117        let (stdin_writer, stdin_reader) = tokio::io::duplex(ap.max_stdin_buf_size.unwrap_or(MAX_BUF_SIZE));
118        let (stdout_writer, stdout_reader) = if ap.stdout {
119            let (w, r) = tokio::io::duplex(ap.max_stdout_buf_size.unwrap_or(MAX_BUF_SIZE));
120            (Some(w), Some(r))
121        } else {
122            (None, None)
123        };
124        let (stderr_writer, stderr_reader) = if ap.stderr {
125            let (w, r) = tokio::io::duplex(ap.max_stderr_buf_size.unwrap_or(MAX_BUF_SIZE));
126            (Some(w), Some(r))
127        } else {
128            (None, None)
129        };
130        let (status_tx, status_rx) = oneshot::channel();
131        let (terminal_resize_tx, terminal_resize_rx) = if ap.tty {
132            let (w, r) = mpsc::channel(10);
133            (Some(w), Some(r))
134        } else {
135            (None, None)
136        };
137
138        let task = tokio::spawn(start_message_loop(
139            connection,
140            stdin_reader,
141            stdout_writer,
142            stderr_writer,
143            status_tx,
144            terminal_resize_rx,
145        ));
146
147        AttachedProcess {
148            has_stdin: ap.stdin,
149            has_stdout: ap.stdout,
150            has_stderr: ap.stderr,
151            task,
152            stdin_writer: Some(stdin_writer),
153            stdout_reader,
154            stderr_reader,
155            terminal_resize_tx,
156            status_rx: Some(status_rx),
157        }
158    }
159
160    /// Async writer to stdin.
161    /// ```no_run
162    /// # use kube_client::api::AttachedProcess;
163    /// # use tokio::io::{AsyncReadExt, AsyncWriteExt};
164    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
165    /// # let attached: AttachedProcess = todo!();
166    /// let mut stdin_writer = attached.stdin().unwrap();
167    /// stdin_writer.write(b"foo\n").await?;
168    /// # Ok(())
169    /// # }
170    /// ```
171    /// Only available if [`AttachParams`](super::AttachParams) had `stdin`.
172    pub fn stdin(&mut self) -> Option<impl AsyncWrite + Unpin> {
173        if !self.has_stdin {
174            return None;
175        }
176        self.stdin_writer.take()
177    }
178
179    /// Async reader for stdout outputs.
180    /// ```no_run
181    /// # use kube_client::api::AttachedProcess;
182    /// # use tokio::io::{AsyncReadExt, AsyncWriteExt};
183    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
184    /// # let attached: AttachedProcess = todo!();
185    /// let mut stdout_reader = attached.stdout().unwrap();
186    /// let mut buf = [0u8; 4];
187    /// stdout_reader.read_exact(&mut buf).await?;
188    /// # Ok(())
189    /// # }
190    /// ```
191    /// Only available if [`AttachParams`](super::AttachParams) had `stdout`.
192    pub fn stdout(&mut self) -> Option<impl AsyncRead + Unpin> {
193        if !self.has_stdout {
194            return None;
195        }
196        self.stdout_reader.take()
197    }
198
199    /// Async reader for stderr outputs.
200    /// ```no_run
201    /// # use kube_client::api::AttachedProcess;
202    /// # use tokio::io::{AsyncReadExt, AsyncWriteExt};
203    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
204    /// # let attached: AttachedProcess = todo!();
205    /// let mut stderr_reader = attached.stderr().unwrap();
206    /// let mut buf = [0u8; 4];
207    /// stderr_reader.read_exact(&mut buf).await?;
208    /// # Ok(())
209    /// # }
210    /// ```
211    /// Only available if [`AttachParams`](super::AttachParams) had `stderr`.
212    pub fn stderr(&mut self) -> Option<impl AsyncRead + Unpin> {
213        if !self.has_stderr {
214            return None;
215        }
216        self.stderr_reader.take()
217    }
218
219    /// Abort the background task, causing remote command to fail.
220    #[inline]
221    pub fn abort(&self) {
222        self.task.abort();
223    }
224
225    /// Waits for the remote command task to complete.
226    pub async fn join(self) -> Result<(), Error> {
227        self.task.await.unwrap_or_else(|e| Err(Error::Spawn(e)))
228    }
229
230    /// Take a future that resolves with any status object or when the sender is dropped.
231    ///
232    /// Returns `None` if called more than once.
233    pub fn take_status(&mut self) -> Option<impl Future<Output = Option<Status>>> {
234        self.status_rx.take().map(|recv| recv.map(|res| res.ok()))
235    }
236
237    /// Async writer to change the terminal size
238    /// ```no_run
239    /// # use kube_client::api::{AttachedProcess, TerminalSize};
240    /// # use tokio::io::{AsyncReadExt, AsyncWriteExt};
241    /// # use futures::SinkExt;
242    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
243    /// # let attached: AttachedProcess = todo!();
244    /// let mut terminal_size_writer = attached.terminal_size().unwrap();
245    /// terminal_size_writer.send(TerminalSize{
246    ///     height: 100,
247    ///     width: 200,
248    /// }).await?;
249    /// # Ok(())
250    /// # }
251    /// ```
252    /// Only available if [`AttachParams`](super::AttachParams) had `tty`.
253    pub fn terminal_size(&mut self) -> Option<TerminalSizeSender> {
254        self.terminal_resize_tx.take()
255    }
256}
257
258// theses values come from here: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L57
259const STDIN_CHANNEL: u8 = 0;
260const STDOUT_CHANNEL: u8 = 1;
261const STDERR_CHANNEL: u8 = 2;
262// status channel receives `Status` object on exit.
263const STATUS_CHANNEL: u8 = 3;
264// resize channel is use to send TerminalSize object to change the size of the terminal
265const RESIZE_CHANNEL: u8 = 4;
266/// Used to signal that a channel has reached EOF. Only works on V5 of the protocol.
267const CLOSE_CHANNEL: u8 = 255;
268
269async fn start_message_loop(
270    connection: Connection,
271    stdin: impl AsyncRead + Unpin,
272    mut stdout: Option<impl AsyncWrite + Unpin>,
273    mut stderr: Option<impl AsyncWrite + Unpin>,
274    status_tx: StatusSender,
275    mut terminal_size_rx: Option<TerminalSizeReceiver>,
276) -> Result<(), Error> {
277    let supports_stream_close = connection.supports_stream_close();
278    let stream = connection.into_stream();
279    let mut stdin_stream = tokio_util::io::ReaderStream::new(stdin);
280    let (mut server_send, raw_server_recv) = stream.split();
281    // Work with filtered messages to reduce noise.
282    let mut server_recv = raw_server_recv.filter_map(filter_message).boxed();
283    let mut have_terminal_size_rx = terminal_size_rx.is_some();
284
285    // True until we reach EOF for stdin.
286    let mut stdin_is_open = true;
287
288    loop {
289        let terminal_size_next = async {
290            match terminal_size_rx.as_mut() {
291                Some(tmp) => Some(tmp.next().await),
292                None => None,
293            }
294        };
295        select! {
296            server_message = server_recv.next() => {
297                match server_message {
298                    Some(Ok(Message::Stdout(bin))) => {
299                        if let Some(stdout) = stdout.as_mut() {
300                            stdout.write_all(&bin[1..]).await.map_err(Error::WriteStdout)?;
301                        }
302                    },
303                    Some(Ok(Message::Stderr(bin))) => {
304                        if let Some(stderr) = stderr.as_mut() {
305                            stderr.write_all(&bin[1..]).await.map_err(Error::WriteStderr)?;
306                        }
307                    },
308                    Some(Ok(Message::Status(bin))) => {
309                        let status = serde_json::from_slice::<Status>(&bin[1..]).map_err(Error::DeserializeStatus)?;
310                        status_tx.send(status).map_err(|_| Error::SendStatus)?;
311                        break
312                    },
313                    Some(Err(err)) => {
314                        return Err(Error::ReceiveWebSocketMessage(err));
315                    },
316                    None => {
317                        // Connection closed properly
318                        break
319                    },
320                }
321            },
322            stdin_message = stdin_stream.next(), if stdin_is_open => {
323                match stdin_message {
324                    Some(Ok(bytes)) => {
325                        if !bytes.is_empty() {
326                            let mut vec = Vec::with_capacity(bytes.len() + 1);
327                            vec.push(STDIN_CHANNEL);
328                            vec.extend_from_slice(&bytes[..]);
329                            server_send
330                                .send(ws::Message::binary(vec))
331                                .await
332                                .map_err(Error::SendStdin)?;
333                        }
334                    },
335                    Some(Err(err)) => {
336                        return Err(Error::ReadStdin(err));
337                    }
338                    None => {
339                        // Stdin closed (writer half dropped).
340                        // Let the server know we reached the end of stdin.
341                        if supports_stream_close {
342                            // Signal stdin has reached EOF.
343                            // See: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go#L346
344                            let vec = vec![CLOSE_CHANNEL, STDIN_CHANNEL];
345                            server_send
346                                .send(ws::Message::binary(vec))
347                                .await
348                                .map_err(Error::SendStdin)?;
349                        } else {
350                            // Best we can do is trigger the whole websocket to close.
351                            // We may miss out on any remaining stdout data that has not
352                            // been sent yet.
353                            server_send.close().await.map_err(Error::SendClose)?;
354                        }
355
356                        // Do not check stdin_stream for data in future loops.
357                        stdin_is_open = false;
358                    }
359                }
360            },
361            Some(terminal_size_message) = terminal_size_next, if have_terminal_size_rx => {
362                match terminal_size_message {
363                    Some(new_size) => {
364                        let new_size = serde_json::to_vec(&new_size).map_err(Error::SerializeTerminalSize)?;
365                        let mut vec = Vec::with_capacity(new_size.len() + 1);
366                        vec.push(RESIZE_CHANNEL);
367                        vec.extend_from_slice(&new_size[..]);
368                        server_send.send(ws::Message::Binary(vec.into())).await.map_err(Error::SendTerminalSize)?;
369                    },
370                    None => {
371                        have_terminal_size_rx = false;
372                    }
373                }
374            },
375        }
376    }
377
378    Ok(())
379}
380
381/// Channeled messages from the server.
382enum Message {
383    /// To Stdout channel (1)
384    Stdout(Vec<u8>),
385    /// To stderr channel (2)
386    Stderr(Vec<u8>),
387    /// To error/status channel (3)
388    Status(Vec<u8>),
389}
390
391// Filter to reduce all the possible WebSocket messages into a few we expect to receive.
392async fn filter_message(wsm: Result<ws::Message, ws::Error>) -> Option<Result<Message, ws::Error>> {
393    match wsm {
394        // The protocol only sends binary frames.
395        // Message of size 1 (only channel number) is sent on connection.
396        Ok(ws::Message::Binary(bin)) if bin.len() > 1 => match bin[0] {
397            STDOUT_CHANNEL => Some(Ok(Message::Stdout(bin.into()))),
398            STDERR_CHANNEL => Some(Ok(Message::Stderr(bin.into()))),
399            STATUS_CHANNEL => Some(Ok(Message::Status(bin.into()))),
400            // We don't receive messages to stdin and resize channels.
401            _ => None,
402        },
403        // Ignore any other message types.
404        // We can ignore close message because the server never sends anything special.
405        // The connection terminates on `None`.
406        Ok(_) => None,
407        // Fatal errors. `WebSocketStream` turns `ConnectionClosed` and `AlreadyClosed` into `None`.
408        // So these are unrecoverables.
409        Err(err) => Some(Err(err)),
410    }
411}