use crate::preview2::bindings::cli::{
stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr, terminal_stdin,
terminal_stdout,
};
use crate::preview2::bindings::io::streams;
use crate::preview2::pipe::{self, AsyncWriteStream};
use crate::preview2::{HostInputStream, HostOutputStream, WasiView};
use std::io::IsTerminal;
use wasmtime::component::Resource;
pub trait StdinStream: Send + Sync {
fn stream(&self) -> Box<dyn HostInputStream>;
fn isatty(&self) -> bool;
}
impl StdinStream for pipe::MemoryInputPipe {
fn stream(&self) -> Box<dyn HostInputStream> {
Box::new(self.clone())
}
fn isatty(&self) -> bool {
false
}
}
impl StdinStream for pipe::ClosedInputStream {
fn stream(&self) -> Box<dyn HostInputStream> {
Box::new(self.clone())
}
fn isatty(&self) -> bool {
false
}
}
mod worker_thread_stdin;
pub use self::worker_thread_stdin::{stdin, Stdin};
const STDIO_BUFFER_SIZE: usize = 4096;
pub trait StdoutStream: Send + Sync {
fn stream(&self) -> Box<dyn HostOutputStream>;
fn isatty(&self) -> bool;
}
impl StdoutStream for pipe::MemoryOutputPipe {
fn stream(&self) -> Box<dyn HostOutputStream> {
Box::new(self.clone())
}
fn isatty(&self) -> bool {
false
}
}
impl StdoutStream for pipe::SinkOutputStream {
fn stream(&self) -> Box<dyn HostOutputStream> {
Box::new(self.clone())
}
fn isatty(&self) -> bool {
false
}
}
impl StdoutStream for pipe::ClosedOutputStream {
fn stream(&self) -> Box<dyn HostOutputStream> {
Box::new(self.clone())
}
fn isatty(&self) -> bool {
false
}
}
pub struct Stdout;
pub fn stdout() -> Stdout {
Stdout
}
impl StdoutStream for Stdout {
fn stream(&self) -> Box<dyn HostOutputStream> {
Box::new(AsyncWriteStream::new(
STDIO_BUFFER_SIZE,
tokio::io::stdout(),
))
}
fn isatty(&self) -> bool {
std::io::stdout().is_terminal()
}
}
pub struct Stderr;
pub fn stderr() -> Stderr {
Stderr
}
impl StdoutStream for Stderr {
fn stream(&self) -> Box<dyn HostOutputStream> {
Box::new(AsyncWriteStream::new(
STDIO_BUFFER_SIZE,
tokio::io::stderr(),
))
}
fn isatty(&self) -> bool {
std::io::stderr().is_terminal()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsATTY {
Yes,
No,
}
impl<T: WasiView> stdin::Host for T {
fn get_stdin(&mut self) -> Result<Resource<streams::InputStream>, anyhow::Error> {
let stream = self.ctx_mut().stdin.stream();
Ok(self
.table_mut()
.push_resource(streams::InputStream::Host(stream))?)
}
}
impl<T: WasiView> stdout::Host for T {
fn get_stdout(&mut self) -> Result<Resource<streams::OutputStream>, anyhow::Error> {
let stream = self.ctx_mut().stdout.stream();
Ok(self.table_mut().push_resource(stream)?)
}
}
impl<T: WasiView> stderr::Host for T {
fn get_stderr(&mut self) -> Result<Resource<streams::OutputStream>, anyhow::Error> {
let stream = self.ctx_mut().stderr.stream();
Ok(self.table_mut().push_resource(stream)?)
}
}
pub struct TerminalInput;
pub struct TerminalOutput;
impl<T: WasiView> terminal_input::Host for T {}
impl<T: WasiView> terminal_input::HostTerminalInput for T {
fn drop(&mut self, r: Resource<TerminalInput>) -> anyhow::Result<()> {
self.table_mut().delete_resource(r)?;
Ok(())
}
}
impl<T: WasiView> terminal_output::Host for T {}
impl<T: WasiView> terminal_output::HostTerminalOutput for T {
fn drop(&mut self, r: Resource<TerminalOutput>) -> anyhow::Result<()> {
self.table_mut().delete_resource(r)?;
Ok(())
}
}
impl<T: WasiView> terminal_stdin::Host for T {
fn get_terminal_stdin(&mut self) -> anyhow::Result<Option<Resource<TerminalInput>>> {
if self.ctx().stdin.isatty() {
let fd = self.table_mut().push_resource(TerminalInput)?;
Ok(Some(fd))
} else {
Ok(None)
}
}
}
impl<T: WasiView> terminal_stdout::Host for T {
fn get_terminal_stdout(&mut self) -> anyhow::Result<Option<Resource<TerminalOutput>>> {
if self.ctx().stdout.isatty() {
let fd = self.table_mut().push_resource(TerminalOutput)?;
Ok(Some(fd))
} else {
Ok(None)
}
}
}
impl<T: WasiView> terminal_stderr::Host for T {
fn get_terminal_stderr(&mut self) -> anyhow::Result<Option<Resource<TerminalOutput>>> {
if self.ctx().stderr.isatty() {
let fd = self.table_mut().push_resource(TerminalOutput)?;
Ok(Some(fd))
} else {
Ok(None)
}
}
}
#[cfg(all(unix, test))]
mod test {
use crate::preview2::HostInputStream;
use libc;
use std::fs::File;
use std::io::{BufRead, BufReader, Write};
use std::os::fd::FromRawFd;
fn test_child_stdin<T, P>(child: T, parent: P)
where
T: FnOnce(File),
P: FnOnce(File, BufReader<File>),
{
unsafe {
let mut stdin_fds: [libc::c_int; 2] = [0; 2];
assert_eq!(
libc::pipe(stdin_fds.as_mut_ptr()),
0,
"Failed to create stdin pipe"
);
let [stdin_read, stdin_write] = stdin_fds;
let mut result_fds: [libc::c_int; 2] = [0; 2];
assert_eq!(
libc::pipe(result_fds.as_mut_ptr()),
0,
"Failed to create result pipe"
);
let [result_read, result_write] = result_fds;
let child_pid = libc::fork();
if child_pid == 0 {
libc::close(stdin_write);
libc::close(result_read);
libc::close(libc::STDIN_FILENO);
libc::dup2(stdin_read, libc::STDIN_FILENO);
let result_write = File::from_raw_fd(result_write);
child(result_write);
} else {
libc::close(stdin_read);
libc::close(result_write);
let stdin_write = File::from_raw_fd(stdin_write);
let result_read = BufReader::new(File::from_raw_fd(result_read));
parent(stdin_write, result_read);
}
}
}
fn test_stdin_by_forking<S, T>(mk_stdin: T)
where
S: HostInputStream,
T: Fn() -> S,
{
test_child_stdin(
|mut result_write| {
let mut child_running = true;
while child_running {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
'task: loop {
println!("child: creating stdin");
let mut stdin = mk_stdin();
println!("child: checking that stdin is not ready");
assert!(
tokio::time::timeout(
std::time::Duration::from_millis(100),
stdin.ready()
)
.await
.is_err(),
"stdin available too soon"
);
writeln!(&mut result_write, "start").unwrap();
println!("child: started");
let mut buffer = String::new();
loop {
println!("child: waiting for stdin to be ready");
stdin.ready().await;
println!("child: reading input");
let bytes = stdin.read(1024).unwrap();
println!("child got: {:?}", bytes);
buffer.push_str(std::str::from_utf8(bytes.as_ref()).unwrap());
if let Some((line, rest)) = buffer.split_once('\n') {
if line == "all done" {
writeln!(&mut result_write, "done").unwrap();
println!("child: exiting...");
child_running = false;
break 'task;
} else if line == "restart_runtime" {
writeln!(&mut result_write, "restarting").unwrap();
println!("child: restarting runtime...");
break 'task;
} else if line == "restart_task" {
writeln!(&mut result_write, "restarting").unwrap();
println!("child: restarting task...");
continue 'task;
} else {
writeln!(&mut result_write, "{}", line).unwrap();
}
buffer = rest.to_owned();
}
}
}
});
println!("runtime exited");
}
println!("child exited");
},
|mut stdin_write, mut result_read| {
let mut line = String::new();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, "start\n");
for i in 0..5 {
let message = format!("some bytes {}\n", i);
stdin_write.write_all(message.as_bytes()).unwrap();
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, message);
}
writeln!(&mut stdin_write, "restart_task").unwrap();
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, "restarting\n");
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, "start\n");
for i in 0..10 {
let message = format!("more bytes {}\n", i);
stdin_write.write_all(message.as_bytes()).unwrap();
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, message);
}
writeln!(&mut stdin_write, "restart_runtime").unwrap();
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, "restarting\n");
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, "start\n");
for i in 0..17 {
let message = format!("even more bytes {}\n", i);
stdin_write.write_all(message.as_bytes()).unwrap();
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, message);
}
writeln!(&mut stdin_write, "all done").unwrap();
line.clear();
result_read.read_line(&mut line).unwrap();
assert_eq!(line, "done\n");
},
)
}
#[test]
#[cfg_attr(not(target_arch = "x86_64"), ignore)]
fn test_worker_thread_stdin() {
test_stdin_by_forking(super::worker_thread_stdin::stdin);
}
}