kube_client/api/
remote_command.rs

1use std::future::Future;
2
3use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status;
4
5use futures::{
6    channel::{mpsc, oneshot},
7    FutureExt, SinkExt, StreamExt,
8};
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11use tokio::{
12    io::{AsyncRead, AsyncWrite, AsyncWriteExt, DuplexStream},
13    select,
14};
15use tokio_tungstenite::{
16    tungstenite::{self as ws},
17    WebSocketStream,
18};
19
20use super::AttachParams;
21
22type StatusReceiver = oneshot::Receiver<Status>;
23type StatusSender = oneshot::Sender<Status>;
24
25type TerminalSizeReceiver = mpsc::Receiver<TerminalSize>;
26type TerminalSizeSender = mpsc::Sender<TerminalSize>;
27
28/// TerminalSize define the size of a terminal
29#[derive(Debug, Serialize, Deserialize)]
30#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
31#[serde(rename_all = "PascalCase")]
32pub struct TerminalSize {
33    /// width of the terminal
34    pub width: u16,
35    /// height of the terminal
36    pub height: u16,
37}
38
39/// Errors from attaching to a pod.
40#[derive(Debug, Error)]
41pub enum Error {
42    /// Failed to read from stdin
43    #[error("failed to read from stdin: {0}")]
44    ReadStdin(#[source] std::io::Error),
45
46    /// Failed to send stdin data to the pod
47    #[error("failed to send a stdin data: {0}")]
48    SendStdin(#[source] ws::Error),
49
50    /// Failed to write to stdout
51    #[error("failed to write to stdout: {0}")]
52    WriteStdout(#[source] std::io::Error),
53
54    /// Failed to write to stderr
55    #[error("failed to write to stderr: {0}")]
56    WriteStderr(#[source] std::io::Error),
57
58    /// Failed to receive a WebSocket message from the server.
59    #[error("failed to receive a WebSocket message: {0}")]
60    ReceiveWebSocketMessage(#[source] ws::Error),
61
62    // Failed to complete the background task
63    #[error("failed to complete the background task: {0}")]
64    Spawn(#[source] tokio::task::JoinError),
65
66    /// Failed to send close message.
67    #[error("failed to send a WebSocket close message: {0}")]
68    SendClose(#[source] ws::Error),
69
70    /// Failed to deserialize status object
71    #[error("failed to deserialize status object: {0}")]
72    DeserializeStatus(#[source] serde_json::Error),
73
74    /// Failed to send status object
75    #[error("failed to send status object")]
76    SendStatus,
77
78    /// Fail to serialize Terminalsize object
79    #[error("failed to serialize TerminalSize object: {0}")]
80    SerializeTerminalSize(#[source] serde_json::Error),
81
82    /// Fail to send terminal size message
83    #[error("failed to send terminal size message")]
84    SendTerminalSize(#[source] ws::Error),
85
86    /// Failed to set terminal size, tty need to be true to resize the terminal
87    #[error("failed to set terminal size, tty need to be true to resize the terminal")]
88    TtyNeedToBeTrue,
89}
90
91const MAX_BUF_SIZE: usize = 1024;
92
93/// Represents an attached process in a container for [`attach`] and [`exec`].
94///
95/// Provides access to `stdin`, `stdout`, and `stderr` if attached.
96///
97/// Use [`AttachedProcess::join`] to wait for the process to terminate.
98///
99/// [`attach`]: crate::Api::attach
100/// [`exec`]: crate::Api::exec
101#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
102pub struct AttachedProcess {
103    has_stdin: bool,
104    has_stdout: bool,
105    has_stderr: bool,
106    stdin_writer: Option<DuplexStream>,
107    stdout_reader: Option<DuplexStream>,
108    stderr_reader: Option<DuplexStream>,
109    status_rx: Option<StatusReceiver>,
110    terminal_resize_tx: Option<TerminalSizeSender>,
111    task: tokio::task::JoinHandle<Result<(), Error>>,
112}
113
114impl AttachedProcess {
115    pub(crate) fn new<S>(stream: WebSocketStream<S>, ap: &AttachParams) -> Self
116    where
117        S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static,
118    {
119        // To simplify the implementation, always create a pipe for stdin.
120        // The caller does not have access to it unless they had requested.
121        let (stdin_writer, stdin_reader) = tokio::io::duplex(ap.max_stdin_buf_size.unwrap_or(MAX_BUF_SIZE));
122        let (stdout_writer, stdout_reader) = if ap.stdout {
123            let (w, r) = tokio::io::duplex(ap.max_stdout_buf_size.unwrap_or(MAX_BUF_SIZE));
124            (Some(w), Some(r))
125        } else {
126            (None, None)
127        };
128        let (stderr_writer, stderr_reader) = if ap.stderr {
129            let (w, r) = tokio::io::duplex(ap.max_stderr_buf_size.unwrap_or(MAX_BUF_SIZE));
130            (Some(w), Some(r))
131        } else {
132            (None, None)
133        };
134        let (status_tx, status_rx) = oneshot::channel();
135        let (terminal_resize_tx, terminal_resize_rx) = if ap.tty {
136            let (w, r) = mpsc::channel(10);
137            (Some(w), Some(r))
138        } else {
139            (None, None)
140        };
141
142        let task = tokio::spawn(start_message_loop(
143            stream,
144            stdin_reader,
145            stdout_writer,
146            stderr_writer,
147            status_tx,
148            terminal_resize_rx,
149        ));
150
151        AttachedProcess {
152            has_stdin: ap.stdin,
153            has_stdout: ap.stdout,
154            has_stderr: ap.stderr,
155            task,
156            stdin_writer: Some(stdin_writer),
157            stdout_reader,
158            stderr_reader,
159            terminal_resize_tx,
160            status_rx: Some(status_rx),
161        }
162    }
163
164    /// Async writer to stdin.
165    /// ```no_run
166    /// # use kube_client::api::AttachedProcess;
167    /// # use tokio::io::{AsyncReadExt, AsyncWriteExt};
168    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
169    /// # let attached: AttachedProcess = todo!();
170    /// let mut stdin_writer = attached.stdin().unwrap();
171    /// stdin_writer.write(b"foo\n").await?;
172    /// # Ok(())
173    /// # }
174    /// ```
175    /// Only available if [`AttachParams`](super::AttachParams) had `stdin`.
176    pub fn stdin(&mut self) -> Option<impl AsyncWrite + Unpin> {
177        if !self.has_stdin {
178            return None;
179        }
180        self.stdin_writer.take()
181    }
182
183    /// Async reader for stdout outputs.
184    /// ```no_run
185    /// # use kube_client::api::AttachedProcess;
186    /// # use tokio::io::{AsyncReadExt, AsyncWriteExt};
187    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
188    /// # let attached: AttachedProcess = todo!();
189    /// let mut stdout_reader = attached.stdout().unwrap();
190    /// let mut buf = [0u8; 4];
191    /// stdout_reader.read_exact(&mut buf).await?;
192    /// # Ok(())
193    /// # }
194    /// ```
195    /// Only available if [`AttachParams`](super::AttachParams) had `stdout`.
196    pub fn stdout(&mut self) -> Option<impl AsyncRead + Unpin> {
197        if !self.has_stdout {
198            return None;
199        }
200        self.stdout_reader.take()
201    }
202
203    /// Async reader for stderr outputs.
204    /// ```no_run
205    /// # use kube_client::api::AttachedProcess;
206    /// # use tokio::io::{AsyncReadExt, AsyncWriteExt};
207    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
208    /// # let attached: AttachedProcess = todo!();
209    /// let mut stderr_reader = attached.stderr().unwrap();
210    /// let mut buf = [0u8; 4];
211    /// stderr_reader.read_exact(&mut buf).await?;
212    /// # Ok(())
213    /// # }
214    /// ```
215    /// Only available if [`AttachParams`](super::AttachParams) had `stderr`.
216    pub fn stderr(&mut self) -> Option<impl AsyncRead + Unpin> {
217        if !self.has_stderr {
218            return None;
219        }
220        self.stderr_reader.take()
221    }
222
223    /// Abort the background task, causing remote command to fail.
224    #[inline]
225    pub fn abort(&self) {
226        self.task.abort();
227    }
228
229    /// Waits for the remote command task to complete.
230    pub async fn join(self) -> Result<(), Error> {
231        self.task.await.unwrap_or_else(|e| Err(Error::Spawn(e)))
232    }
233
234    /// Take a future that resolves with any status object or when the sender is dropped.
235    ///
236    /// Returns `None` if called more than once.
237    pub fn take_status(&mut self) -> Option<impl Future<Output = Option<Status>>> {
238        self.status_rx.take().map(|recv| recv.map(|res| res.ok()))
239    }
240
241    /// Async writer to change the terminal size
242    /// ```no_run
243    /// # use kube_client::api::{AttachedProcess, TerminalSize};
244    /// # use tokio::io::{AsyncReadExt, AsyncWriteExt};
245    /// # use futures::SinkExt;
246    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
247    /// # let attached: AttachedProcess = todo!();
248    /// let mut terminal_size_writer = attached.terminal_size().unwrap();
249    /// terminal_size_writer.send(TerminalSize{
250    ///     height: 100,
251    ///     width: 200,
252    /// }).await?;
253    /// # Ok(())
254    /// # }
255    /// ```
256    /// Only available if [`AttachParams`](super::AttachParams) had `tty`.
257    pub fn terminal_size(&mut self) -> Option<TerminalSizeSender> {
258        self.terminal_resize_tx.take()
259    }
260}
261
262// theses values come from here: https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/cri/streaming/remotecommand/websocket.go#L34
263const STDIN_CHANNEL: u8 = 0;
264const STDOUT_CHANNEL: u8 = 1;
265const STDERR_CHANNEL: u8 = 2;
266// status channel receives `Status` object on exit.
267const STATUS_CHANNEL: u8 = 3;
268// resize channel is use to send TerminalSize object to change the size of the terminal
269const RESIZE_CHANNEL: u8 = 4;
270
271async fn start_message_loop<S>(
272    stream: WebSocketStream<S>,
273    stdin: impl AsyncRead + Unpin,
274    mut stdout: Option<impl AsyncWrite + Unpin>,
275    mut stderr: Option<impl AsyncWrite + Unpin>,
276    status_tx: StatusSender,
277    mut terminal_size_rx: Option<TerminalSizeReceiver>,
278) -> Result<(), Error>
279where
280    S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static,
281{
282    let mut stdin_stream = tokio_util::io::ReaderStream::new(stdin);
283    let (mut server_send, raw_server_recv) = stream.split();
284    // Work with filtered messages to reduce noise.
285    let mut server_recv = raw_server_recv.filter_map(filter_message).boxed();
286    let mut have_terminal_size_rx = terminal_size_rx.is_some();
287
288    loop {
289        let terminal_size_next = async {
290            match terminal_size_rx.as_mut() {
291                Some(tmp) => Some(tmp.next().await),
292                None => None,
293            }
294        };
295        select! {
296            server_message = server_recv.next() => {
297                match server_message {
298                    Some(Ok(Message::Stdout(bin))) => {
299                        if let Some(stdout) = stdout.as_mut() {
300                            stdout.write_all(&bin[1..]).await.map_err(Error::WriteStdout)?;
301                        }
302                    },
303                    Some(Ok(Message::Stderr(bin))) => {
304                        if let Some(stderr) = stderr.as_mut() {
305                            stderr.write_all(&bin[1..]).await.map_err(Error::WriteStderr)?;
306                        }
307                    },
308                    Some(Ok(Message::Status(bin))) => {
309                        let status = serde_json::from_slice::<Status>(&bin[1..]).map_err(Error::DeserializeStatus)?;
310                        status_tx.send(status).map_err(|_| Error::SendStatus)?;
311                        break
312                    },
313                    Some(Err(err)) => {
314                        return Err(Error::ReceiveWebSocketMessage(err));
315                    },
316                    None => {
317                        // Connection closed properly
318                        break
319                    },
320                }
321            },
322            stdin_message = stdin_stream.next() => {
323                match stdin_message {
324                    Some(Ok(bytes)) => {
325                        if !bytes.is_empty() {
326                            let mut vec = Vec::with_capacity(bytes.len() + 1);
327                            vec.push(STDIN_CHANNEL);
328                            vec.extend_from_slice(&bytes[..]);
329                            server_send
330                                .send(ws::Message::binary(vec))
331                                .await
332                                .map_err(Error::SendStdin)?;
333                        }
334                    },
335                    Some(Err(err)) => {
336                        return Err(Error::ReadStdin(err));
337                    }
338                    None => {
339                        // Stdin closed (writer half dropped).
340                        // Let the server know and disconnect.
341                        server_send.close().await.map_err(Error::SendClose)?;
342                        break;
343                    }
344                }
345            },
346            Some(terminal_size_message) = terminal_size_next, if have_terminal_size_rx => {
347                match terminal_size_message {
348                    Some(new_size) => {
349                        let new_size = serde_json::to_vec(&new_size).map_err(Error::SerializeTerminalSize)?;
350                        let mut vec = Vec::with_capacity(new_size.len() + 1);
351                        vec.push(RESIZE_CHANNEL);
352                        vec.extend_from_slice(&new_size[..]);
353                        server_send.send(ws::Message::Binary(vec.into())).await.map_err(Error::SendTerminalSize)?;
354                    },
355                    None => {
356                        have_terminal_size_rx = false;
357                    }
358                }
359            },
360        }
361    }
362
363    Ok(())
364}
365
366/// Channeled messages from the server.
367enum Message {
368    /// To Stdout channel (1)
369    Stdout(Vec<u8>),
370    /// To stderr channel (2)
371    Stderr(Vec<u8>),
372    /// To error/status channel (3)
373    Status(Vec<u8>),
374}
375
376// Filter to reduce all the possible WebSocket messages into a few we expect to receive.
377async fn filter_message(wsm: Result<ws::Message, ws::Error>) -> Option<Result<Message, ws::Error>> {
378    match wsm {
379        // The protocol only sends binary frames.
380        // Message of size 1 (only channel number) is sent on connection.
381        Ok(ws::Message::Binary(bin)) if bin.len() > 1 => match bin[0] {
382            STDOUT_CHANNEL => Some(Ok(Message::Stdout(bin.into()))),
383            STDERR_CHANNEL => Some(Ok(Message::Stderr(bin.into()))),
384            STATUS_CHANNEL => Some(Ok(Message::Status(bin.into()))),
385            // We don't receive messages to stdin and resize channels.
386            _ => None,
387        },
388        // Ignore any other message types.
389        // We can ignore close message because the server never sends anything special.
390        // The connection terminates on `None`.
391        Ok(_) => None,
392        // Fatal errors. `WebSocketStream` turns `ConnectionClosed` and `AlreadyClosed` into `None`.
393        // So these are unrecoverables.
394        Err(err) => Some(Err(err)),
395    }
396}