1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
//! Virtual pipes.
//!
//! These types provide easy implementations of `WasiFile` that mimic much of the behavior of Unix
//! pipes. These are particularly helpful for redirecting WASI stdio handles to destinations other
//! than OS files.
//!
//! Some convenience constructors are included for common backing types like `Vec<u8>` and `String`,
//! but the virtual pipes can be instantiated with any `Read` or `Write` type.
//!
use crate::preview2::stream::{InputStream, OutputStream};
use anyhow::Error;
use std::any::Any;
use std::convert::TryInto;
use std::io::{self, Read, Write};
use std::sync::{Arc, RwLock};
use system_interface::io::ReadReady;
/// A virtual pipe read end.
///
/// This reads from a source that implements the [`Read`] trait. It
/// also requires the [`ReadReady`] trait, which is implemented for many
/// popular `Read`-implementing types and is easy to implemented for new
/// types.
///
/// A variety of `From` impls are provided so that common pipe types are
/// easy to create. For example:
///
/// ```
/// use wasmtime_wasi::preview2::{pipe::ReadPipe, WasiCtx};
/// let stdin = ReadPipe::from("hello from stdin!");
/// let builder = WasiCtx::builder().set_stdin(stdin);
/// ```
#[derive(Debug)]
pub struct ReadPipe<R: Read + ReadReady> {
reader: Arc<RwLock<R>>,
}
impl<R: Read + ReadReady> Clone for ReadPipe<R> {
fn clone(&self) -> Self {
Self {
reader: self.reader.clone(),
}
}
}
impl<R: Read + ReadReady> ReadPipe<R> {
/// Create a new pipe from a `Read` type.
///
/// All `Handle` read operations delegate to reading from this underlying reader.
pub fn new(r: R) -> Self {
Self::from_shared(Arc::new(RwLock::new(r)))
}
/// Create a new pipe from a shareable `Read` type.
///
/// All `Handle` read operations delegate to reading from this underlying reader.
pub fn from_shared(reader: Arc<RwLock<R>>) -> Self {
Self { reader }
}
/// Try to convert this `ReadPipe<R>` back to the underlying `R` type.
///
/// This will fail with `Err(self)` if multiple references to the underlying `R` exist.
pub fn try_into_inner(mut self) -> Result<R, Self> {
match Arc::try_unwrap(self.reader) {
Ok(rc) => Ok(RwLock::into_inner(rc).unwrap()),
Err(reader) => {
self.reader = reader;
Err(self)
}
}
}
fn borrow(&self) -> std::sync::RwLockWriteGuard<R> {
RwLock::write(&self.reader).unwrap()
}
}
impl From<Vec<u8>> for ReadPipe<io::Cursor<Vec<u8>>> {
fn from(r: Vec<u8>) -> Self {
Self::new(io::Cursor::new(r))
}
}
impl From<&[u8]> for ReadPipe<io::Cursor<Vec<u8>>> {
fn from(r: &[u8]) -> Self {
Self::from(r.to_vec())
}
}
impl From<String> for ReadPipe<io::Cursor<String>> {
fn from(r: String) -> Self {
Self::new(io::Cursor::new(r))
}
}
impl From<&str> for ReadPipe<io::Cursor<String>> {
fn from(r: &str) -> Self {
Self::from(r.to_string())
}
}
#[async_trait::async_trait]
impl<R: Read + ReadReady + Any + Send + Sync> InputStream for ReadPipe<R> {
fn as_any(&self) -> &dyn Any {
self
}
async fn num_ready_bytes(&self) -> Result<u64, Error> {
Ok(self.borrow().num_ready_bytes()?)
}
async fn read(&mut self, buf: &mut [u8]) -> Result<(u64, bool), Error> {
match self.borrow().read(buf) {
Ok(0) => Ok((0, true)),
Ok(n) => Ok((n.try_into()?, false)),
Err(e) if e.kind() == io::ErrorKind::Interrupted => Ok((0, false)),
Err(e) => Err(e.into()),
}
}
async fn skip(&mut self, nelem: u64) -> Result<(u64, bool), Error> {
let num = io::copy(
&mut io::Read::take(&mut *self.borrow(), nelem),
&mut io::sink(),
)?;
Ok((num, num < nelem))
}
async fn readable(&self) -> Result<(), Error> {
Ok(())
}
}
/// A virtual pipe write end.
///
/// ```no_run
/// use wasmtime_wasi::preview2::{pipe::WritePipe, WasiCtx, Table};
/// let mut table = Table::new();
/// let stdout = WritePipe::new_in_memory();
/// let mut ctx = WasiCtx::builder().set_stdout(stdout.clone()).build(&mut table).unwrap();
/// // use ctx and table in an instance, then make sure it is dropped:
/// drop(ctx);
/// drop(table);
/// let contents: Vec<u8> = stdout.try_into_inner().expect("sole remaining reference to WritePipe").into_inner();
/// println!("contents of stdout: {:?}", contents);
/// ```
#[derive(Debug)]
pub struct WritePipe<W: Write> {
writer: Arc<RwLock<W>>,
}
impl<W: Write> Clone for WritePipe<W> {
fn clone(&self) -> Self {
Self {
writer: self.writer.clone(),
}
}
}
impl<W: Write> WritePipe<W> {
/// Create a new pipe from a `Write` type.
///
/// All `Handle` write operations delegate to writing to this underlying writer.
pub fn new(w: W) -> Self {
Self::from_shared(Arc::new(RwLock::new(w)))
}
/// Create a new pipe from a shareable `Write` type.
///
/// All `Handle` write operations delegate to writing to this underlying writer.
pub fn from_shared(writer: Arc<RwLock<W>>) -> Self {
Self { writer }
}
/// Try to convert this `WritePipe<W>` back to the underlying `W` type.
///
/// This will fail with `Err(self)` if multiple references to the underlying `W` exist.
pub fn try_into_inner(mut self) -> Result<W, Self> {
match Arc::try_unwrap(self.writer) {
Ok(rc) => Ok(RwLock::into_inner(rc).unwrap()),
Err(writer) => {
self.writer = writer;
Err(self)
}
}
}
fn borrow(&self) -> std::sync::RwLockWriteGuard<W> {
RwLock::write(&self.writer).unwrap()
}
}
impl WritePipe<io::Cursor<Vec<u8>>> {
/// Create a new writable virtual pipe backed by a `Vec<u8>` buffer.
pub fn new_in_memory() -> Self {
Self::new(io::Cursor::new(vec![]))
}
}
#[async_trait::async_trait]
impl<W: Write + Any + Send + Sync> OutputStream for WritePipe<W> {
fn as_any(&self) -> &dyn Any {
self
}
async fn write(&mut self, buf: &[u8]) -> Result<u64, Error> {
let n = self.borrow().write(buf)?;
Ok(n.try_into()?)
}
// TODO: Optimize for pipes.
/*
async fn splice(
&mut self,
src: &mut dyn InputStream,
nelem: u64,
) -> Result<u64, Error> {
todo!()
}
*/
async fn write_zeroes(&mut self, nelem: u64) -> Result<u64, Error> {
let num = io::copy(
&mut io::Read::take(io::repeat(0), nelem),
&mut *self.borrow(),
)?;
Ok(num)
}
async fn writable(&self) -> Result<(), Error> {
Ok(())
}
}