1use crate::bindings::cli::{
2 stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr, terminal_stdin,
3 terminal_stdout,
4};
5use crate::pipe;
6use crate::{
7 InputStream, IoView, OutputStream, Pollable, StreamError, StreamResult, WasiImpl, WasiView,
8};
9use bytes::Bytes;
10use std::io::IsTerminal;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use wasmtime::component::Resource;
14use wasmtime_wasi_io::streams;
15
16pub trait StdinStream: Send {
24 fn stream(&self) -> Box<dyn InputStream>;
36
37 fn isatty(&self) -> bool;
39}
40
41impl StdinStream for pipe::MemoryInputPipe {
42 fn stream(&self) -> Box<dyn InputStream> {
43 Box::new(self.clone())
44 }
45
46 fn isatty(&self) -> bool {
47 false
48 }
49}
50
51impl StdinStream for pipe::ClosedInputStream {
52 fn stream(&self) -> Box<dyn InputStream> {
53 Box::new(*self)
54 }
55
56 fn isatty(&self) -> bool {
57 false
58 }
59}
60
61pub struct AsyncStdinStream(Arc<Mutex<crate::pipe::AsyncReadStream>>);
90
91impl AsyncStdinStream {
92 pub fn new(s: crate::pipe::AsyncReadStream) -> Self {
93 Self(Arc::new(Mutex::new(s)))
94 }
95}
96
97impl StdinStream for AsyncStdinStream {
98 fn stream(&self) -> Box<dyn InputStream> {
99 Box::new(Self(self.0.clone()))
100 }
101 fn isatty(&self) -> bool {
102 false
103 }
104}
105
106#[async_trait::async_trait]
107impl InputStream for AsyncStdinStream {
108 fn read(&mut self, size: usize) -> Result<bytes::Bytes, StreamError> {
109 match self.0.try_lock() {
110 Ok(mut stream) => stream.read(size),
111 Err(_) => Err(StreamError::trap("concurrent reads are not supported")),
112 }
113 }
114 fn skip(&mut self, size: usize) -> Result<usize, StreamError> {
115 match self.0.try_lock() {
116 Ok(mut stream) => stream.skip(size),
117 Err(_) => Err(StreamError::trap("concurrent skips are not supported")),
118 }
119 }
120 async fn cancel(&mut self) {
121 if let Some(mutex) = Arc::get_mut(&mut self.0) {
123 match mutex.try_lock() {
124 Ok(mut stream) => stream.cancel().await,
125 Err(_) => {}
126 }
127 }
128 }
129}
130
131#[async_trait::async_trait]
132impl Pollable for AsyncStdinStream {
133 async fn ready(&mut self) {
134 self.0.lock().await.ready().await
135 }
136}
137
138mod worker_thread_stdin;
139pub use self::worker_thread_stdin::{stdin, Stdin};
140
141pub trait StdoutStream: Send {
143 fn stream(&self) -> Box<dyn OutputStream>;
156
157 fn isatty(&self) -> bool;
159}
160
161impl StdoutStream for pipe::MemoryOutputPipe {
162 fn stream(&self) -> Box<dyn OutputStream> {
163 Box::new(self.clone())
164 }
165
166 fn isatty(&self) -> bool {
167 false
168 }
169}
170
171impl StdoutStream for pipe::SinkOutputStream {
172 fn stream(&self) -> Box<dyn OutputStream> {
173 Box::new(*self)
174 }
175
176 fn isatty(&self) -> bool {
177 false
178 }
179}
180
181impl StdoutStream for pipe::ClosedOutputStream {
182 fn stream(&self) -> Box<dyn OutputStream> {
183 Box::new(*self)
184 }
185
186 fn isatty(&self) -> bool {
187 false
188 }
189}
190
191pub struct OutputFile {
195 file: Arc<std::fs::File>,
196}
197
198impl OutputFile {
199 pub fn new(file: std::fs::File) -> Self {
200 Self {
201 file: Arc::new(file),
202 }
203 }
204}
205
206impl StdoutStream for OutputFile {
207 fn stream(&self) -> Box<dyn OutputStream> {
208 Box::new(OutputFileStream {
209 file: Arc::clone(&self.file),
210 })
211 }
212
213 fn isatty(&self) -> bool {
214 false
215 }
216}
217
218struct OutputFileStream {
219 file: Arc<std::fs::File>,
220}
221
222#[async_trait::async_trait]
223impl Pollable for OutputFileStream {
224 async fn ready(&mut self) {}
225}
226
227impl OutputStream for OutputFileStream {
228 fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
229 use std::io::Write;
230 self.file
231 .write_all(&bytes)
232 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
233 }
234
235 fn flush(&mut self) -> StreamResult<()> {
236 use std::io::Write;
237 self.file
238 .flush()
239 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
240 }
241
242 fn check_write(&mut self) -> StreamResult<usize> {
243 Ok(1024 * 1024)
244 }
245}
246
247pub struct Stdout;
252
253pub fn stdout() -> Stdout {
258 Stdout
259}
260
261impl StdoutStream for Stdout {
262 fn stream(&self) -> Box<dyn OutputStream> {
263 Box::new(StdioOutputStream::Stdout)
264 }
265
266 fn isatty(&self) -> bool {
267 std::io::stdout().is_terminal()
268 }
269}
270
271pub struct Stderr;
276
277pub fn stderr() -> Stderr {
282 Stderr
283}
284
285impl StdoutStream for Stderr {
286 fn stream(&self) -> Box<dyn OutputStream> {
287 Box::new(StdioOutputStream::Stderr)
288 }
289
290 fn isatty(&self) -> bool {
291 std::io::stderr().is_terminal()
292 }
293}
294
295enum StdioOutputStream {
296 Stdout,
297 Stderr,
298}
299
300impl OutputStream for StdioOutputStream {
301 fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
302 use std::io::Write;
303 match self {
304 StdioOutputStream::Stdout => std::io::stdout().write_all(&bytes),
305 StdioOutputStream::Stderr => std::io::stderr().write_all(&bytes),
306 }
307 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
308 }
309
310 fn flush(&mut self) -> StreamResult<()> {
311 use std::io::Write;
312 match self {
313 StdioOutputStream::Stdout => std::io::stdout().flush(),
314 StdioOutputStream::Stderr => std::io::stderr().flush(),
315 }
316 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
317 }
318
319 fn check_write(&mut self) -> StreamResult<usize> {
320 Ok(1024 * 1024)
321 }
322}
323
324#[async_trait::async_trait]
325impl Pollable for StdioOutputStream {
326 async fn ready(&mut self) {}
327}
328
329pub struct AsyncStdoutStream(Arc<Mutex<crate::pipe::AsyncWriteStream>>);
337
338impl AsyncStdoutStream {
339 pub fn new(s: crate::pipe::AsyncWriteStream) -> Self {
340 Self(Arc::new(Mutex::new(s)))
341 }
342}
343
344impl StdoutStream for AsyncStdoutStream {
345 fn stream(&self) -> Box<dyn OutputStream> {
346 Box::new(Self(self.0.clone()))
347 }
348 fn isatty(&self) -> bool {
349 false
350 }
351}
352
353#[async_trait::async_trait]
369impl OutputStream for AsyncStdoutStream {
370 fn check_write(&mut self) -> Result<usize, StreamError> {
371 match self.0.try_lock() {
372 Ok(mut stream) => stream.check_write(),
373 Err(_) => Err(StreamError::trap("concurrent writes are not supported")),
374 }
375 }
376 fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> {
377 match self.0.try_lock() {
378 Ok(mut stream) => stream.write(bytes),
379 Err(_) => Err(StreamError::trap("concurrent writes not supported yet")),
380 }
381 }
382 fn flush(&mut self) -> Result<(), StreamError> {
383 match self.0.try_lock() {
384 Ok(mut stream) => stream.flush(),
385 Err(_) => Err(StreamError::trap("concurrent flushes not supported yet")),
386 }
387 }
388 async fn cancel(&mut self) {
389 if let Some(mutex) = Arc::get_mut(&mut self.0) {
391 match mutex.try_lock() {
392 Ok(mut stream) => stream.cancel().await,
393 Err(_) => {}
394 }
395 }
396 }
397}
398
399#[async_trait::async_trait]
400impl Pollable for AsyncStdoutStream {
401 async fn ready(&mut self) {
402 self.0.lock().await.ready().await
403 }
404}
405
406#[derive(Debug, Clone, Copy, PartialEq, Eq)]
407pub enum IsATTY {
408 Yes,
409 No,
410}
411
412impl<T> stdin::Host for WasiImpl<T>
413where
414 T: WasiView,
415{
416 fn get_stdin(&mut self) -> Result<Resource<streams::DynInputStream>, anyhow::Error> {
417 let stream = self.ctx().stdin.stream();
418 Ok(self.table().push(stream)?)
419 }
420}
421
422impl<T> stdout::Host for WasiImpl<T>
423where
424 T: WasiView,
425{
426 fn get_stdout(&mut self) -> Result<Resource<streams::DynOutputStream>, anyhow::Error> {
427 let stream = self.ctx().stdout.stream();
428 Ok(self.table().push(stream)?)
429 }
430}
431
432impl<T> stderr::Host for WasiImpl<T>
433where
434 T: WasiView,
435{
436 fn get_stderr(&mut self) -> Result<Resource<streams::DynOutputStream>, anyhow::Error> {
437 let stream = self.ctx().stderr.stream();
438 Ok(self.table().push(stream)?)
439 }
440}
441
442pub struct TerminalInput;
443pub struct TerminalOutput;
444
445impl<T> terminal_input::Host for WasiImpl<T> where T: WasiView {}
446impl<T> terminal_input::HostTerminalInput for WasiImpl<T>
447where
448 T: WasiView,
449{
450 fn drop(&mut self, r: Resource<TerminalInput>) -> anyhow::Result<()> {
451 self.table().delete(r)?;
452 Ok(())
453 }
454}
455impl<T> terminal_output::Host for WasiImpl<T> where T: WasiView {}
456impl<T> terminal_output::HostTerminalOutput for WasiImpl<T>
457where
458 T: WasiView,
459{
460 fn drop(&mut self, r: Resource<TerminalOutput>) -> anyhow::Result<()> {
461 self.table().delete(r)?;
462 Ok(())
463 }
464}
465impl<T> terminal_stdin::Host for WasiImpl<T>
466where
467 T: WasiView,
468{
469 fn get_terminal_stdin(&mut self) -> anyhow::Result<Option<Resource<TerminalInput>>> {
470 if self.ctx().stdin.isatty() {
471 let fd = self.table().push(TerminalInput)?;
472 Ok(Some(fd))
473 } else {
474 Ok(None)
475 }
476 }
477}
478impl<T> terminal_stdout::Host for WasiImpl<T>
479where
480 T: WasiView,
481{
482 fn get_terminal_stdout(&mut self) -> anyhow::Result<Option<Resource<TerminalOutput>>> {
483 if self.ctx().stdout.isatty() {
484 let fd = self.table().push(TerminalOutput)?;
485 Ok(Some(fd))
486 } else {
487 Ok(None)
488 }
489 }
490}
491impl<T> terminal_stderr::Host for WasiImpl<T>
492where
493 T: WasiView,
494{
495 fn get_terminal_stderr(&mut self) -> anyhow::Result<Option<Resource<TerminalOutput>>> {
496 if self.ctx().stderr.isatty() {
497 let fd = self.table().push(TerminalOutput)?;
498 Ok(Some(fd))
499 } else {
500 Ok(None)
501 }
502 }
503}
504
505#[cfg(test)]
506mod test {
507 use crate::stdio::StdoutStream;
508 use crate::write_stream::AsyncWriteStream;
509 use crate::{AsyncStdoutStream, OutputStream};
510 use anyhow::Result;
511 use bytes::Bytes;
512 use tokio::io::AsyncReadExt;
513
514 #[test]
515 fn memory_stdin_stream() {
516 let pipe = super::pipe::MemoryInputPipe::new(
525 "the quick brown fox jumped over the three lazy dogs",
526 );
527
528 use super::StdinStream;
529
530 let mut view1 = pipe.stream();
531 let mut view2 = pipe.stream();
532
533 let read1 = view1.read(10).expect("read first 10 bytes");
534 assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
535 let read2 = view2.read(10).expect("read second 10 bytes");
536 assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
537 let read3 = view1.read(10).expect("read third 10 bytes");
538 assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
539 let read4 = view2.read(10).expect("read fourth 10 bytes");
540 assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
541 }
542
543 #[tokio::test]
544 async fn async_stdin_stream() {
545 let dir = tempfile::tempdir().unwrap();
556 let mut path = std::path::PathBuf::from(dir.path());
557 path.push("file");
558 std::fs::write(&path, "the quick brown fox jumped over the three lazy dogs").unwrap();
559
560 let file = tokio::fs::File::open(&path)
561 .await
562 .expect("open created file");
563 let stdin_stream = super::AsyncStdinStream::new(crate::pipe::AsyncReadStream::new(file));
564
565 use super::StdinStream;
566
567 let mut view1 = stdin_stream.stream();
568 let mut view2 = stdin_stream.stream();
569
570 view1.ready().await;
571
572 let read1 = view1.read(10).expect("read first 10 bytes");
573 assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
574 let read2 = view2.read(10).expect("read second 10 bytes");
575 assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
576 let read3 = view1.read(10).expect("read third 10 bytes");
577 assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
578 let read4 = view2.read(10).expect("read fourth 10 bytes");
579 assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
580 }
581
582 #[tokio::test]
583 async fn async_stdout_stream_unblocks() {
584 let (mut read, write) = tokio::io::duplex(32);
585 let stdout = AsyncStdoutStream::new(AsyncWriteStream::new(32, write));
586
587 let task = tokio::task::spawn(async move {
588 let mut stream = stdout.stream();
589 blocking_write_and_flush(&mut *stream, "x".into())
590 .await
591 .unwrap();
592 });
593
594 let mut buf = [0; 100];
595 let n = read.read(&mut buf).await.unwrap();
596 assert_eq!(&buf[..n], b"x");
597
598 task.await.unwrap();
599 }
600
601 async fn blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()> {
602 while !bytes.is_empty() {
603 let permit = s.write_ready().await?;
604 let len = bytes.len().min(permit);
605 let chunk = bytes.split_to(len);
606 s.write(chunk)?;
607 }
608
609 s.flush()?;
610 s.write_ready().await?;
611 Ok(())
612 }
613}