wasmtime_wasi/stdio/
worker_thread_stdin.rs1use crate::stdio::StdinStream;
27use bytes::{Bytes, BytesMut};
28use std::io::{IsTerminal, Read};
29use std::mem;
30use std::sync::{Condvar, Mutex, OnceLock};
31use tokio::sync::Notify;
32use wasmtime_wasi_io::{
33 poll::Pollable,
34 streams::{InputStream, StreamError},
35};
36
37#[derive(Default)]
38struct GlobalStdin {
39 state: Mutex<StdinState>,
40 read_requested: Condvar,
41 read_completed: Notify,
42}
43
44#[derive(Default, Debug)]
45enum StdinState {
46 #[default]
47 ReadNotRequested,
48 ReadRequested,
49 Data(BytesMut),
50 Error(std::io::Error),
51 Closed,
52}
53
54impl GlobalStdin {
55 fn get() -> &'static GlobalStdin {
56 static STDIN: OnceLock<GlobalStdin> = OnceLock::new();
57 STDIN.get_or_init(|| create())
58 }
59}
60
61fn create() -> GlobalStdin {
62 std::thread::spawn(|| {
63 let state = GlobalStdin::get();
64 loop {
65 let mut lock = state.state.lock().unwrap();
68 lock = state
69 .read_requested
70 .wait_while(lock, |state| !matches!(state, StdinState::ReadRequested))
71 .unwrap();
72 drop(lock);
73
74 let mut bytes = BytesMut::zeroed(1024);
75 let (new_state, done) = match std::io::stdin().read(&mut bytes) {
76 Ok(0) => (StdinState::Closed, true),
77 Ok(nbytes) => {
78 bytes.truncate(nbytes);
79 (StdinState::Data(bytes), false)
80 }
81 Err(e) => (StdinState::Error(e), true),
82 };
83
84 debug_assert!(matches!(
87 *state.state.lock().unwrap(),
88 StdinState::ReadRequested
89 ));
90 *state.state.lock().unwrap() = new_state;
91 state.read_completed.notify_waiters();
92 if done {
93 break;
94 }
95 }
96 });
97
98 GlobalStdin::default()
99}
100
101#[derive(Clone)]
103pub struct Stdin;
104
105pub fn stdin() -> Stdin {
110 Stdin
111}
112
113impl StdinStream for Stdin {
114 fn stream(&self) -> Box<dyn InputStream> {
115 Box::new(Stdin)
116 }
117
118 fn isatty(&self) -> bool {
119 std::io::stdin().is_terminal()
120 }
121}
122
123#[async_trait::async_trait]
124impl InputStream for Stdin {
125 fn read(&mut self, size: usize) -> Result<Bytes, StreamError> {
126 let g = GlobalStdin::get();
127 let mut locked = g.state.lock().unwrap();
128 match mem::replace(&mut *locked, StdinState::ReadRequested) {
129 StdinState::ReadNotRequested => {
130 g.read_requested.notify_one();
131 Ok(Bytes::new())
132 }
133 StdinState::ReadRequested => Ok(Bytes::new()),
134 StdinState::Data(mut data) => {
135 let size = data.len().min(size);
136 let bytes = data.split_to(size);
137 *locked = if data.is_empty() {
138 StdinState::ReadNotRequested
139 } else {
140 StdinState::Data(data)
141 };
142 Ok(bytes.freeze())
143 }
144 StdinState::Error(e) => {
145 *locked = StdinState::Closed;
146 Err(StreamError::LastOperationFailed(e.into()))
147 }
148 StdinState::Closed => {
149 *locked = StdinState::Closed;
150 Err(StreamError::Closed)
151 }
152 }
153 }
154}
155
156#[async_trait::async_trait]
157impl Pollable for Stdin {
158 async fn ready(&mut self) {
159 let g = GlobalStdin::get();
160
161 let notified = {
164 let mut locked = g.state.lock().unwrap();
165 match *locked {
166 StdinState::ReadNotRequested => {
168 g.read_requested.notify_one();
169 *locked = StdinState::ReadRequested;
170 g.read_completed.notified()
171 }
172 StdinState::ReadRequested => g.read_completed.notified(),
173 StdinState::Data(_) | StdinState::Closed | StdinState::Error(_) => return,
174 }
175 };
176
177 notified.await;
178 }
179}