use crate::preview2::{Table, TableError};
use anyhow::Error;
use std::any::Any;
#[async_trait::async_trait]
pub trait InputStream: Send + Sync {
fn as_any(&self) -> &dyn Any;
#[cfg(unix)]
fn pollable_read(&self) -> Option<rustix::fd::BorrowedFd> {
None
}
#[cfg(windows)]
fn pollable_read(&self) -> Option<io_extras::os::windows::BorrowedHandleOrSocket> {
None
}
async fn read(&mut self, _buf: &mut [u8]) -> Result<(u64, bool), Error> {
Err(anyhow::anyhow!("badf"))
}
async fn read_vectored<'a>(
&mut self,
_bufs: &mut [std::io::IoSliceMut<'a>],
) -> Result<(u64, bool), Error> {
Err(anyhow::anyhow!("badf"))
}
fn is_read_vectored(&self) -> bool {
false
}
async fn skip(&mut self, nelem: u64) -> Result<(u64, bool), Error> {
let mut nread = 0;
let mut saw_end = false;
for _ in 0..nelem {
let (num, end) = self.read(&mut [0]).await?;
nread += num;
if end {
saw_end = true;
break;
}
}
Ok((nread, saw_end))
}
async fn num_ready_bytes(&self) -> Result<u64, Error> {
Ok(0)
}
async fn readable(&self) -> Result<(), Error>;
}
#[async_trait::async_trait]
pub trait OutputStream: Send + Sync {
fn as_any(&self) -> &dyn Any;
#[cfg(unix)]
fn pollable_write(&self) -> Option<rustix::fd::BorrowedFd> {
None
}
#[cfg(windows)]
fn pollable_write(&self) -> Option<io_extras::os::windows::BorrowedHandleOrSocket> {
None
}
async fn write(&mut self, _buf: &[u8]) -> Result<u64, Error> {
Err(anyhow::anyhow!("badf"))
}
async fn write_vectored<'a>(&mut self, _bufs: &[std::io::IoSlice<'a>]) -> Result<u64, Error> {
Err(anyhow::anyhow!("badf"))
}
fn is_write_vectored(&self) -> bool {
false
}
async fn splice(
&mut self,
src: &mut dyn InputStream,
nelem: u64,
) -> Result<(u64, bool), Error> {
let mut nspliced = 0;
let mut saw_end = false;
for _ in 0..nelem {
let mut buf = [0u8];
let (num, end) = src.read(&mut buf).await?;
self.write(&buf).await?;
nspliced += num;
if end {
saw_end = true;
break;
}
}
Ok((nspliced, saw_end))
}
async fn write_zeroes(&mut self, nelem: u64) -> Result<u64, Error> {
let mut nwritten = 0;
for _ in 0..nelem {
let num = self.write(&[0]).await?;
if num == 0 {
break;
}
nwritten += num;
}
Ok(nwritten)
}
async fn writable(&self) -> Result<(), Error>;
}
pub trait TableStreamExt {
fn push_input_stream(&mut self, istream: Box<dyn InputStream>) -> Result<u32, TableError>;
fn get_input_stream(&self, fd: u32) -> Result<&dyn InputStream, TableError>;
fn get_input_stream_mut(&mut self, fd: u32) -> Result<&mut Box<dyn InputStream>, TableError>;
fn push_output_stream(&mut self, ostream: Box<dyn OutputStream>) -> Result<u32, TableError>;
fn get_output_stream(&self, fd: u32) -> Result<&dyn OutputStream, TableError>;
fn get_output_stream_mut(&mut self, fd: u32) -> Result<&mut Box<dyn OutputStream>, TableError>;
}
impl TableStreamExt for Table {
fn push_input_stream(&mut self, istream: Box<dyn InputStream>) -> Result<u32, TableError> {
self.push(Box::new(istream))
}
fn get_input_stream(&self, fd: u32) -> Result<&dyn InputStream, TableError> {
self.get::<Box<dyn InputStream>>(fd).map(|f| f.as_ref())
}
fn get_input_stream_mut(&mut self, fd: u32) -> Result<&mut Box<dyn InputStream>, TableError> {
self.get_mut::<Box<dyn InputStream>>(fd)
}
fn push_output_stream(&mut self, ostream: Box<dyn OutputStream>) -> Result<u32, TableError> {
self.push(Box::new(ostream))
}
fn get_output_stream(&self, fd: u32) -> Result<&dyn OutputStream, TableError> {
self.get::<Box<dyn OutputStream>>(fd).map(|f| f.as_ref())
}
fn get_output_stream_mut(&mut self, fd: u32) -> Result<&mut Box<dyn OutputStream>, TableError> {
self.get_mut::<Box<dyn OutputStream>>(fd)
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::preview2::pipe::{ReadPipe, WritePipe};
#[test]
fn input_stream_in_table() {
let empty_pipe = ReadPipe::new(std::io::empty());
let mut table = Table::new();
let ix = table.push_input_stream(Box::new(empty_pipe)).unwrap();
let _ = table.get_input_stream(ix).unwrap();
let _ = table.get_input_stream_mut(ix).unwrap();
}
#[test]
fn output_stream_in_table() {
let dev_null = WritePipe::new(std::io::sink());
let mut table = Table::new();
let ix = table.push_output_stream(Box::new(dev_null)).unwrap();
let _ = table.get_output_stream(ix).unwrap();
let _ = table.get_output_stream_mut(ix).unwrap();
}
}