1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
//! Handling for standard in using a worker task.
//!
//! Standard input is a global singleton resource for the entire program which
//! needs special care. Currently this implementation adheres to a few
//! constraints which make this nontrivial to implement.
//!
//! * Any number of guest wasm programs can read stdin. While this doesn't make
//! a ton of sense semantically they shouldn't block forever. Instead it's a
//! race to see who actually reads which parts of stdin.
//!
//! * Data from stdin isn't actually read unless requested. This is done to try
//! to be a good neighbor to others running in the process. Under the
//! assumption that most programs have one "thing" which reads stdin the
//! actual consumption of bytes is delayed until the wasm guest is dynamically
//! chosen to be that "thing". Before that data from stdin is not consumed to
//! avoid taking it from other components in the process.
//!
//! * Tokio's documentation indicates that "interactive stdin" is best done with
//! a helper thread to avoid blocking shutdown of the event loop. That's
//! respected here where all stdin reading happens on a blocking helper thread
//! that, at this time, is never shut down.
//!
//! This module is one that's likely to change over time though as new systems
//! are encountered along with preexisting bugs.
use crate::preview2::poll::Subscribe;
use crate::preview2::stdio::StdinStream;
use crate::preview2::{HostInputStream, StreamError};
use bytes::{Bytes, BytesMut};
use std::io::{IsTerminal, Read};
use std::mem;
use std::sync::{Condvar, Mutex, OnceLock};
use tokio::sync::Notify;
#[derive(Default)]
struct GlobalStdin {
state: Mutex<StdinState>,
read_requested: Condvar,
read_completed: Notify,
}
#[derive(Default, Debug)]
enum StdinState {
#[default]
ReadNotRequested,
ReadRequested,
Data(BytesMut),
Error(std::io::Error),
Closed,
}
impl GlobalStdin {
fn get() -> &'static GlobalStdin {
static STDIN: OnceLock<GlobalStdin> = OnceLock::new();
STDIN.get_or_init(|| create())
}
}
fn create() -> GlobalStdin {
std::thread::spawn(|| {
let state = GlobalStdin::get();
loop {
// Wait for a read to be requested, but don't hold the lock across
// the blocking read.
let mut lock = state.state.lock().unwrap();
lock = state
.read_requested
.wait_while(lock, |state| !matches!(state, StdinState::ReadRequested))
.unwrap();
drop(lock);
let mut bytes = BytesMut::zeroed(1024);
let (new_state, done) = match std::io::stdin().read(&mut bytes) {
Ok(0) => (StdinState::Closed, true),
Ok(nbytes) => {
bytes.truncate(nbytes);
(StdinState::Data(bytes), false)
}
Err(e) => (StdinState::Error(e), true),
};
// After the blocking read completes the state should not have been
// tampered with.
debug_assert!(matches!(
*state.state.lock().unwrap(),
StdinState::ReadRequested
));
*state.state.lock().unwrap() = new_state;
state.read_completed.notify_waiters();
if done {
break;
}
}
});
GlobalStdin::default()
}
/// Only public interface is the [`HostInputStream`] impl.
#[derive(Clone)]
pub struct Stdin;
pub fn stdin() -> Stdin {
Stdin
}
impl StdinStream for Stdin {
fn stream(&self) -> Box<dyn HostInputStream> {
Box::new(Stdin)
}
fn isatty(&self) -> bool {
std::io::stdin().is_terminal()
}
}
#[async_trait::async_trait]
impl HostInputStream for Stdin {
fn read(&mut self, size: usize) -> Result<Bytes, StreamError> {
let g = GlobalStdin::get();
let mut locked = g.state.lock().unwrap();
match mem::replace(&mut *locked, StdinState::ReadRequested) {
StdinState::ReadNotRequested => {
g.read_requested.notify_one();
Ok(Bytes::new())
}
StdinState::ReadRequested => Ok(Bytes::new()),
StdinState::Data(mut data) => {
let size = data.len().min(size);
let bytes = data.split_to(size);
*locked = if data.is_empty() {
StdinState::ReadNotRequested
} else {
StdinState::Data(data)
};
Ok(bytes.freeze())
}
StdinState::Error(e) => {
*locked = StdinState::Closed;
Err(StreamError::LastOperationFailed(e.into()))
}
StdinState::Closed => {
*locked = StdinState::Closed;
Err(StreamError::Closed)
}
}
}
}
#[async_trait::async_trait]
impl Subscribe for Stdin {
async fn ready(&mut self) {
let g = GlobalStdin::get();
// Scope the synchronous `state.lock()` to this block which does not
// `.await` inside of it.
let notified = {
let mut locked = g.state.lock().unwrap();
match *locked {
// If a read isn't requested yet
StdinState::ReadNotRequested => {
g.read_requested.notify_one();
*locked = StdinState::ReadRequested;
g.read_completed.notified()
}
StdinState::ReadRequested => g.read_completed.notified(),
StdinState::Data(_) | StdinState::Closed | StdinState::Error(_) => return,
}
};
notified.await;
}
}