wasmtime_wasi/
stdio.rs

1use crate::bindings::cli::{
2    stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr, terminal_stdin,
3    terminal_stdout,
4};
5use crate::pipe;
6use crate::{
7    InputStream, IoView, OutputStream, Pollable, StreamError, StreamResult, WasiImpl, WasiView,
8};
9use bytes::Bytes;
10use std::io::IsTerminal;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use wasmtime::component::Resource;
14use wasmtime_wasi_io::streams;
15
16/// A trait used to represent the standard input to a guest program.
17///
18/// This is used to implement various WASI APIs via the method implementations
19/// below.
20///
21/// Built-in implementations are provided for [`Stdin`],
22/// [`pipe::MemoryInputPipe`], and [`pipe::ClosedInputStream`].
23pub trait StdinStream: Send {
24    /// Creates a fresh stream which is reading stdin.
25    ///
26    /// Note that the returned stream must share state with all other streams
27    /// previously created. Guests may create multiple handles to the same stdin
28    /// and they should all be synchronized in their progress through the
29    /// program's input.
30    ///
31    /// Note that this means that if one handle becomes ready for reading they
32    /// all become ready for reading. Subsequently if one is read from it may
33    /// mean that all the others are no longer ready for reading. This is
34    /// basically a consequence of the way the WIT APIs are designed today.
35    fn stream(&self) -> Box<dyn InputStream>;
36
37    /// Returns whether this stream is backed by a TTY.
38    fn isatty(&self) -> bool;
39}
40
41impl StdinStream for pipe::MemoryInputPipe {
42    fn stream(&self) -> Box<dyn InputStream> {
43        Box::new(self.clone())
44    }
45
46    fn isatty(&self) -> bool {
47        false
48    }
49}
50
51impl StdinStream for pipe::ClosedInputStream {
52    fn stream(&self) -> Box<dyn InputStream> {
53        Box::new(*self)
54    }
55
56    fn isatty(&self) -> bool {
57        false
58    }
59}
60
61/// An impl of [`StdinStream`] built on top of [`crate::pipe::AsyncReadStream`].
62//
63// Note the usage of `tokio::sync::Mutex` here as opposed to a
64// `std::sync::Mutex`. This is intentionally done to implement the `Pollable`
65// variant of this trait. Note that in doing so we're left with the quandry of
66// how to implement methods of `InputStream` since those methods are not
67// `async`. They're currently implemented with `try_lock`, which then raises the
68// question of what to do on contention. Currently traps are returned.
69//
70// Why should it be ok to return a trap? In general concurrency/contention
71// shouldn't return a trap since it should be able to happen normally. The
72// current assumption, though, is that WASI stdin/stdout streams are special
73// enough that the contention case should never come up in practice. Currently
74// in WASI there is no actually concurrency, there's just the items in a single
75// `Store` and that store owns all of its I/O in a single Tokio task. There's no
76// means to actually spawn multiple Tokio tasks that use the same store. This
77// means at the very least that there's zero parallelism. Due to the lack of
78// multiple tasks that also means that there's no concurrency either.
79//
80// This `AsyncStdinStream` wrapper is only intended to be used by the WASI
81// bindings themselves. It's possible for the host to take this and work with it
82// on its own task, but that's niche enough it's not designed for.
83//
84// Overall that means that the guest is either calling `Pollable` or
85// `InputStream` methods. This means that there should never be contention
86// between the two at this time. This may all change in the future with WASI
87// 0.3, but perhaps we'll have a better story for stdio at that time (see the
88// doc block on the `OutputStream` impl below)
89pub struct AsyncStdinStream(Arc<Mutex<crate::pipe::AsyncReadStream>>);
90
91impl AsyncStdinStream {
92    pub fn new(s: crate::pipe::AsyncReadStream) -> Self {
93        Self(Arc::new(Mutex::new(s)))
94    }
95}
96
97impl StdinStream for AsyncStdinStream {
98    fn stream(&self) -> Box<dyn InputStream> {
99        Box::new(Self(self.0.clone()))
100    }
101    fn isatty(&self) -> bool {
102        false
103    }
104}
105
106#[async_trait::async_trait]
107impl InputStream for AsyncStdinStream {
108    fn read(&mut self, size: usize) -> Result<bytes::Bytes, StreamError> {
109        match self.0.try_lock() {
110            Ok(mut stream) => stream.read(size),
111            Err(_) => Err(StreamError::trap("concurrent reads are not supported")),
112        }
113    }
114    fn skip(&mut self, size: usize) -> Result<usize, StreamError> {
115        match self.0.try_lock() {
116            Ok(mut stream) => stream.skip(size),
117            Err(_) => Err(StreamError::trap("concurrent skips are not supported")),
118        }
119    }
120    async fn cancel(&mut self) {
121        // Cancel the inner stream if we're the last reference to it:
122        if let Some(mutex) = Arc::get_mut(&mut self.0) {
123            match mutex.try_lock() {
124                Ok(mut stream) => stream.cancel().await,
125                Err(_) => {}
126            }
127        }
128    }
129}
130
131#[async_trait::async_trait]
132impl Pollable for AsyncStdinStream {
133    async fn ready(&mut self) {
134        self.0.lock().await.ready().await
135    }
136}
137
138mod worker_thread_stdin;
139pub use self::worker_thread_stdin::{stdin, Stdin};
140
141/// Similar to [`StdinStream`], except for output.
142pub trait StdoutStream: Send {
143    /// Returns a fresh new stream which can write to this output stream.
144    ///
145    /// Note that all output streams should output to the same logical source.
146    /// This means that it's possible for each independent stream to acquire a
147    /// separate "permit" to write and then act on that permit. Note that
148    /// additionally at this time once a permit is "acquired" there's no way to
149    /// release it, for example you can wait for readiness and then never
150    /// actually write in WASI. This means that acquisition of a permit for one
151    /// stream cannot discount the size of a permit another stream could
152    /// obtain.
153    ///
154    /// Implementations must be able to handle this
155    fn stream(&self) -> Box<dyn OutputStream>;
156
157    /// Returns whether this stream is backed by a TTY.
158    fn isatty(&self) -> bool;
159}
160
161impl StdoutStream for pipe::MemoryOutputPipe {
162    fn stream(&self) -> Box<dyn OutputStream> {
163        Box::new(self.clone())
164    }
165
166    fn isatty(&self) -> bool {
167        false
168    }
169}
170
171impl StdoutStream for pipe::SinkOutputStream {
172    fn stream(&self) -> Box<dyn OutputStream> {
173        Box::new(*self)
174    }
175
176    fn isatty(&self) -> bool {
177        false
178    }
179}
180
181impl StdoutStream for pipe::ClosedOutputStream {
182    fn stream(&self) -> Box<dyn OutputStream> {
183        Box::new(*self)
184    }
185
186    fn isatty(&self) -> bool {
187        false
188    }
189}
190
191/// This implementation will yield output streams that block on writes, and
192/// output directly to a file. If truly async output is required, [`AsyncStdoutStream`]
193/// should be used instead.
194pub struct OutputFile {
195    file: Arc<std::fs::File>,
196}
197
198impl OutputFile {
199    pub fn new(file: std::fs::File) -> Self {
200        Self {
201            file: Arc::new(file),
202        }
203    }
204}
205
206impl StdoutStream for OutputFile {
207    fn stream(&self) -> Box<dyn OutputStream> {
208        Box::new(OutputFileStream {
209            file: Arc::clone(&self.file),
210        })
211    }
212
213    fn isatty(&self) -> bool {
214        false
215    }
216}
217
218struct OutputFileStream {
219    file: Arc<std::fs::File>,
220}
221
222#[async_trait::async_trait]
223impl Pollable for OutputFileStream {
224    async fn ready(&mut self) {}
225}
226
227impl OutputStream for OutputFileStream {
228    fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
229        use std::io::Write;
230        self.file
231            .write_all(&bytes)
232            .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
233    }
234
235    fn flush(&mut self) -> StreamResult<()> {
236        use std::io::Write;
237        self.file
238            .flush()
239            .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
240    }
241
242    fn check_write(&mut self) -> StreamResult<usize> {
243        Ok(1024 * 1024)
244    }
245}
246
247/// This implementation will yield output streams that block on writes, as they
248/// inherit the implementation directly from the rust std library. A different
249/// implementation of [`StdoutStream`] will be necessary if truly async output
250/// streams are required.
251pub struct Stdout;
252
253/// Returns a stream that represents the host's standard out.
254///
255/// Suitable for passing to
256/// [`WasiCtxBuilder::stdout`](crate::WasiCtxBuilder::stdout).
257pub fn stdout() -> Stdout {
258    Stdout
259}
260
261impl StdoutStream for Stdout {
262    fn stream(&self) -> Box<dyn OutputStream> {
263        Box::new(StdioOutputStream::Stdout)
264    }
265
266    fn isatty(&self) -> bool {
267        std::io::stdout().is_terminal()
268    }
269}
270
271/// This implementation will yield output streams that block on writes, as they
272/// inherit the implementation directly from the rust std library. A different
273/// implementation of [`StdoutStream`] will be necessary if truly async output
274/// streams are required.
275pub struct Stderr;
276
277/// Returns a stream that represents the host's standard err.
278///
279/// Suitable for passing to
280/// [`WasiCtxBuilder::stderr`](crate::WasiCtxBuilder::stderr).
281pub fn stderr() -> Stderr {
282    Stderr
283}
284
285impl StdoutStream for Stderr {
286    fn stream(&self) -> Box<dyn OutputStream> {
287        Box::new(StdioOutputStream::Stderr)
288    }
289
290    fn isatty(&self) -> bool {
291        std::io::stderr().is_terminal()
292    }
293}
294
295enum StdioOutputStream {
296    Stdout,
297    Stderr,
298}
299
300impl OutputStream for StdioOutputStream {
301    fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
302        use std::io::Write;
303        match self {
304            StdioOutputStream::Stdout => std::io::stdout().write_all(&bytes),
305            StdioOutputStream::Stderr => std::io::stderr().write_all(&bytes),
306        }
307        .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
308    }
309
310    fn flush(&mut self) -> StreamResult<()> {
311        use std::io::Write;
312        match self {
313            StdioOutputStream::Stdout => std::io::stdout().flush(),
314            StdioOutputStream::Stderr => std::io::stderr().flush(),
315        }
316        .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
317    }
318
319    fn check_write(&mut self) -> StreamResult<usize> {
320        Ok(1024 * 1024)
321    }
322}
323
324#[async_trait::async_trait]
325impl Pollable for StdioOutputStream {
326    async fn ready(&mut self) {}
327}
328
329/// A wrapper of [`crate::pipe::AsyncWriteStream`] that implements
330/// [`StdoutStream`]. Note that the [`OutputStream`] impl for this is not
331/// correct when used for interleaved async IO.
332//
333// Note that the use of `tokio::sync::Mutex` here is intentional, in addition to
334// the `try_lock()` calls below in the implementation of `OutputStream`. For
335// more information see the documentation on `AsyncStdinStream`.
336pub struct AsyncStdoutStream(Arc<Mutex<crate::pipe::AsyncWriteStream>>);
337
338impl AsyncStdoutStream {
339    pub fn new(s: crate::pipe::AsyncWriteStream) -> Self {
340        Self(Arc::new(Mutex::new(s)))
341    }
342}
343
344impl StdoutStream for AsyncStdoutStream {
345    fn stream(&self) -> Box<dyn OutputStream> {
346        Box::new(Self(self.0.clone()))
347    }
348    fn isatty(&self) -> bool {
349        false
350    }
351}
352
353// This implementation is known to be bogus. All check-writes and writes are
354// directed at the same underlying stream. The check-write/write protocol does
355// require the size returned by a check-write to be accepted by write, even if
356// other side-effects happen between those calls, and this implementation
357// permits another view (created by StdoutStream::stream()) of the same
358// underlying stream to accept a write which will invalidate a prior
359// check-write of another view.
360// Ultimately, the Std{in,out}Stream::stream() methods exist because many
361// different places in a linked component (which may itself contain many
362// modules) may need to access stdio without any coordination to keep those
363// accesses all using pointing to the same resource. So, we allow many
364// resources to be created. We have the reasonable expectation that programs
365// won't attempt to interleave async IO from these disparate uses of stdio.
366// If that expectation doesn't turn out to be true, and you find yourself at
367// this comment to correct it: sorry about that.
368#[async_trait::async_trait]
369impl OutputStream for AsyncStdoutStream {
370    fn check_write(&mut self) -> Result<usize, StreamError> {
371        match self.0.try_lock() {
372            Ok(mut stream) => stream.check_write(),
373            Err(_) => Err(StreamError::trap("concurrent writes are not supported")),
374        }
375    }
376    fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> {
377        match self.0.try_lock() {
378            Ok(mut stream) => stream.write(bytes),
379            Err(_) => Err(StreamError::trap("concurrent writes not supported yet")),
380        }
381    }
382    fn flush(&mut self) -> Result<(), StreamError> {
383        match self.0.try_lock() {
384            Ok(mut stream) => stream.flush(),
385            Err(_) => Err(StreamError::trap("concurrent flushes not supported yet")),
386        }
387    }
388    async fn cancel(&mut self) {
389        // Cancel the inner stream if we're the last reference to it:
390        if let Some(mutex) = Arc::get_mut(&mut self.0) {
391            match mutex.try_lock() {
392                Ok(mut stream) => stream.cancel().await,
393                Err(_) => {}
394            }
395        }
396    }
397}
398
399#[async_trait::async_trait]
400impl Pollable for AsyncStdoutStream {
401    async fn ready(&mut self) {
402        self.0.lock().await.ready().await
403    }
404}
405
406#[derive(Debug, Clone, Copy, PartialEq, Eq)]
407pub enum IsATTY {
408    Yes,
409    No,
410}
411
412impl<T> stdin::Host for WasiImpl<T>
413where
414    T: WasiView,
415{
416    fn get_stdin(&mut self) -> Result<Resource<streams::DynInputStream>, anyhow::Error> {
417        let stream = self.ctx().stdin.stream();
418        Ok(self.table().push(stream)?)
419    }
420}
421
422impl<T> stdout::Host for WasiImpl<T>
423where
424    T: WasiView,
425{
426    fn get_stdout(&mut self) -> Result<Resource<streams::DynOutputStream>, anyhow::Error> {
427        let stream = self.ctx().stdout.stream();
428        Ok(self.table().push(stream)?)
429    }
430}
431
432impl<T> stderr::Host for WasiImpl<T>
433where
434    T: WasiView,
435{
436    fn get_stderr(&mut self) -> Result<Resource<streams::DynOutputStream>, anyhow::Error> {
437        let stream = self.ctx().stderr.stream();
438        Ok(self.table().push(stream)?)
439    }
440}
441
442pub struct TerminalInput;
443pub struct TerminalOutput;
444
445impl<T> terminal_input::Host for WasiImpl<T> where T: WasiView {}
446impl<T> terminal_input::HostTerminalInput for WasiImpl<T>
447where
448    T: WasiView,
449{
450    fn drop(&mut self, r: Resource<TerminalInput>) -> anyhow::Result<()> {
451        self.table().delete(r)?;
452        Ok(())
453    }
454}
455impl<T> terminal_output::Host for WasiImpl<T> where T: WasiView {}
456impl<T> terminal_output::HostTerminalOutput for WasiImpl<T>
457where
458    T: WasiView,
459{
460    fn drop(&mut self, r: Resource<TerminalOutput>) -> anyhow::Result<()> {
461        self.table().delete(r)?;
462        Ok(())
463    }
464}
465impl<T> terminal_stdin::Host for WasiImpl<T>
466where
467    T: WasiView,
468{
469    fn get_terminal_stdin(&mut self) -> anyhow::Result<Option<Resource<TerminalInput>>> {
470        if self.ctx().stdin.isatty() {
471            let fd = self.table().push(TerminalInput)?;
472            Ok(Some(fd))
473        } else {
474            Ok(None)
475        }
476    }
477}
478impl<T> terminal_stdout::Host for WasiImpl<T>
479where
480    T: WasiView,
481{
482    fn get_terminal_stdout(&mut self) -> anyhow::Result<Option<Resource<TerminalOutput>>> {
483        if self.ctx().stdout.isatty() {
484            let fd = self.table().push(TerminalOutput)?;
485            Ok(Some(fd))
486        } else {
487            Ok(None)
488        }
489    }
490}
491impl<T> terminal_stderr::Host for WasiImpl<T>
492where
493    T: WasiView,
494{
495    fn get_terminal_stderr(&mut self) -> anyhow::Result<Option<Resource<TerminalOutput>>> {
496        if self.ctx().stderr.isatty() {
497            let fd = self.table().push(TerminalOutput)?;
498            Ok(Some(fd))
499        } else {
500            Ok(None)
501        }
502    }
503}
504
505#[cfg(test)]
506mod test {
507    use crate::stdio::StdoutStream;
508    use crate::write_stream::AsyncWriteStream;
509    use crate::{AsyncStdoutStream, OutputStream};
510    use anyhow::Result;
511    use bytes::Bytes;
512    use tokio::io::AsyncReadExt;
513
514    #[test]
515    fn memory_stdin_stream() {
516        // A StdinStream has the property that there are multiple
517        // InputStreams created, using the stream() method which are each
518        // views on the same shared state underneath. Consuming input on one
519        // stream results in consuming that input on all streams.
520        //
521        // The simplest way to measure this is to check if the MemoryInputPipe
522        // impl of StdinStream follows this property.
523
524        let pipe = super::pipe::MemoryInputPipe::new(
525            "the quick brown fox jumped over the three lazy dogs",
526        );
527
528        use super::StdinStream;
529
530        let mut view1 = pipe.stream();
531        let mut view2 = pipe.stream();
532
533        let read1 = view1.read(10).expect("read first 10 bytes");
534        assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
535        let read2 = view2.read(10).expect("read second 10 bytes");
536        assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
537        let read3 = view1.read(10).expect("read third 10 bytes");
538        assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
539        let read4 = view2.read(10).expect("read fourth 10 bytes");
540        assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
541    }
542
543    #[tokio::test]
544    async fn async_stdin_stream() {
545        // A StdinStream has the property that there are multiple
546        // InputStreams created, using the stream() method which are each
547        // views on the same shared state underneath. Consuming input on one
548        // stream results in consuming that input on all streams.
549        //
550        // AsyncStdinStream is a slightly more complex impl of StdinStream
551        // than the MemoryInputPipe above. We can create an AsyncReadStream
552        // from a file on the disk, and an AsyncStdinStream from that common
553        // stream, then check that the same property holds as above.
554
555        let dir = tempfile::tempdir().unwrap();
556        let mut path = std::path::PathBuf::from(dir.path());
557        path.push("file");
558        std::fs::write(&path, "the quick brown fox jumped over the three lazy dogs").unwrap();
559
560        let file = tokio::fs::File::open(&path)
561            .await
562            .expect("open created file");
563        let stdin_stream = super::AsyncStdinStream::new(crate::pipe::AsyncReadStream::new(file));
564
565        use super::StdinStream;
566
567        let mut view1 = stdin_stream.stream();
568        let mut view2 = stdin_stream.stream();
569
570        view1.ready().await;
571
572        let read1 = view1.read(10).expect("read first 10 bytes");
573        assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
574        let read2 = view2.read(10).expect("read second 10 bytes");
575        assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
576        let read3 = view1.read(10).expect("read third 10 bytes");
577        assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
578        let read4 = view2.read(10).expect("read fourth 10 bytes");
579        assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
580    }
581
582    #[tokio::test]
583    async fn async_stdout_stream_unblocks() {
584        let (mut read, write) = tokio::io::duplex(32);
585        let stdout = AsyncStdoutStream::new(AsyncWriteStream::new(32, write));
586
587        let task = tokio::task::spawn(async move {
588            let mut stream = stdout.stream();
589            blocking_write_and_flush(&mut *stream, "x".into())
590                .await
591                .unwrap();
592        });
593
594        let mut buf = [0; 100];
595        let n = read.read(&mut buf).await.unwrap();
596        assert_eq!(&buf[..n], b"x");
597
598        task.await.unwrap();
599    }
600
601    async fn blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()> {
602        while !bytes.is_empty() {
603            let permit = s.write_ready().await?;
604            let len = bytes.len().min(permit);
605            let chunk = bytes.split_to(len);
606            s.write(chunk)?;
607        }
608
609        s.flush()?;
610        s.write_ready().await?;
611        Ok(())
612    }
613}