use bytes::{Buf, Bytes};
use futures::future::BoxFuture;
#[cfg(feature = "futures")]
use futures::Future;
use std::io::IoSlice;
use std::io::{self, Read, Seek, SeekFrom};
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite};
use tokio::sync::{mpsc, mpsc::error::TryRecvError};
use crate::{ArcFile, FsError, VirtualFile};
#[derive(Debug, Clone)]
pub struct Pipe {
send: PipeTx,
recv: PipeRx,
}
#[derive(Debug, Clone)]
pub struct PipeTx {
tx: Arc<Mutex<mpsc::UnboundedSender<Vec<u8>>>>,
block: bool,
}
#[derive(Debug, Clone)]
pub struct PipeRx {
rx: Arc<Mutex<PipeReceiver>>,
block: bool,
}
#[derive(Debug)]
struct PipeReceiver {
chan: mpsc::UnboundedReceiver<Vec<u8>>,
buffer: Option<Bytes>,
}
impl Pipe {
fn new() -> Self {
let (tx, rx) = mpsc::unbounded_channel();
Pipe {
send: PipeTx {
tx: Arc::new(Mutex::new(tx)),
block: true,
},
recv: PipeRx {
rx: Arc::new(Mutex::new(PipeReceiver {
chan: rx,
buffer: None,
})),
block: true,
},
}
}
pub fn channel() -> (Pipe, Pipe) {
let (tx1, rx1) = Pipe::new().split();
let (tx2, rx2) = Pipe::new().split();
let end1 = Pipe::combine(tx1, rx2);
let end2 = Pipe::combine(tx2, rx1);
(end1, end2)
}
pub fn split(self) -> (PipeTx, PipeRx) {
(self.send, self.recv)
}
pub fn combine(tx: PipeTx, rx: PipeRx) -> Self {
Self { send: tx, recv: rx }
}
}
impl From<Pipe> for PipeTx {
fn from(val: Pipe) -> Self {
val.send
}
}
impl From<Pipe> for PipeRx {
fn from(val: Pipe) -> Self {
val.recv
}
}
impl Pipe {
pub fn with_blocking(mut self, block: bool) -> Self {
self.set_blocking(block);
self
}
pub fn set_blocking(&mut self, block: bool) {
self.send.block = block;
self.recv.block = block;
}
pub fn close(&self) {
self.send.close();
}
}
impl PipeTx {
pub fn close(&self) {
let (mut null_tx, _) = mpsc::unbounded_channel();
{
let mut guard = self.tx.lock().unwrap();
std::mem::swap(guard.deref_mut(), &mut null_tx);
}
}
}
impl Seek for Pipe {
fn seek(&mut self, from: SeekFrom) -> io::Result<u64> {
self.recv.seek(from)
}
}
impl Seek for PipeRx {
fn seek(&mut self, _: SeekFrom) -> io::Result<u64> {
Ok(0)
}
}
impl Seek for PipeTx {
fn seek(&mut self, _: SeekFrom) -> io::Result<u64> {
Ok(0)
}
}
impl Read for Pipe {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.recv.read(buf)
}
}
impl Read for PipeRx {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let max_size = buf.len();
let mut rx = self.rx.lock().unwrap();
loop {
{
if let Some(read_buffer) = rx.buffer.as_mut() {
let buf_len = read_buffer.len();
if buf_len > 0 {
let mut read = buf_len.min(max_size);
let mut inner_buf = &read_buffer[..read];
read = Read::read(&mut inner_buf, buf)?;
read_buffer.advance(read);
return Ok(read);
}
}
}
let data = {
match self.block {
true => match rx.chan.blocking_recv() {
Some(a) => a,
None => {
return Ok(0);
}
},
false => match rx.chan.try_recv() {
Ok(a) => a,
Err(TryRecvError::Empty) => {
return Err(Into::<io::Error>::into(io::ErrorKind::WouldBlock));
}
Err(TryRecvError::Disconnected) => {
return Ok(0);
}
},
}
};
rx.buffer.replace(Bytes::from(data));
}
}
}
impl std::io::Write for Pipe {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.send.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
self.send.flush()
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.send.write_all(buf)
}
fn write_fmt(&mut self, fmt: std::fmt::Arguments<'_>) -> io::Result<()> {
self.send.write_fmt(fmt)
}
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
self.send.write_vectored(bufs)
}
}
impl std::io::Write for PipeTx {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let tx = self.tx.lock().unwrap();
tx.send(buf.to_vec())
.map_err(|_| Into::<std::io::Error>::into(std::io::ErrorKind::BrokenPipe))?;
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl AsyncSeek for Pipe {
fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
let this = Pin::new(&mut self.recv);
this.start_seek(position)
}
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
let this = Pin::new(&mut self.recv);
this.poll_complete(cx)
}
}
impl AsyncSeek for PipeRx {
fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> io::Result<()> {
Ok(())
}
fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
Poll::Ready(Ok(0))
}
}
impl AsyncSeek for PipeTx {
fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> io::Result<()> {
Ok(())
}
fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
Poll::Ready(Ok(0))
}
}
impl AsyncWrite for Pipe {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let this = Pin::new(&mut self.send);
this.poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let this = Pin::new(&mut self.send);
this.poll_flush(cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
let this = Pin::new(&mut self.send);
this.poll_shutdown(cx)
}
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize, io::Error>> {
let this = Pin::new(&mut self.send);
this.poll_write_vectored(cx, bufs)
}
}
impl AsyncWrite for PipeTx {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let guard = self.tx.lock().unwrap();
match guard.send(buf.to_vec()) {
Ok(()) => Poll::Ready(Ok(buf.len())),
Err(_) => Poll::Ready(Err(Into::<std::io::Error>::into(
std::io::ErrorKind::BrokenPipe,
))),
}
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.close();
Poll::Ready(Ok(()))
}
}
impl AsyncRead for Pipe {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let this = Pin::new(&mut self.recv);
this.poll_read(cx, buf)
}
}
impl AsyncRead for PipeRx {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let mut rx = self.rx.lock().unwrap();
loop {
{
if let Some(inner_buf) = rx.buffer.as_mut() {
let buf_len = inner_buf.len();
if buf_len > 0 {
let read = buf_len.min(buf.remaining());
buf.put_slice(&inner_buf[..read]);
inner_buf.advance(read);
return Poll::Ready(Ok(()));
}
}
}
let mut rx = Pin::new(rx.deref_mut());
let data = match rx.chan.poll_recv(cx) {
Poll::Ready(Some(a)) => a,
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => {
return match self.block {
true => Poll::Pending,
false => {
Poll::Ready(Err(Into::<io::Error>::into(io::ErrorKind::WouldBlock)))
}
}
}
};
rx.buffer.replace(Bytes::from(data));
}
}
}
impl VirtualFile for Pipe {
fn last_accessed(&self) -> u64 {
0
}
fn last_modified(&self) -> u64 {
0
}
fn created_time(&self) -> u64 {
0
}
fn size(&self) -> u64 {
0
}
fn set_len(&mut self, _new_size: u64) -> crate::Result<()> {
Ok(())
}
fn unlink(&mut self) -> BoxFuture<'static, Result<(), FsError>> {
Box::pin(async { Ok(()) })
}
fn is_open(&self) -> bool {
self.send
.tx
.try_lock()
.map(|a| !a.is_closed())
.unwrap_or_else(|_| true)
}
fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
let mut rx = self.recv.rx.lock().unwrap();
loop {
{
if let Some(inner_buf) = rx.buffer.as_mut() {
let buf_len = inner_buf.len();
if buf_len > 0 {
return Poll::Ready(Ok(buf_len));
}
}
}
let mut pinned_rx = Pin::new(&mut rx.chan);
let data = match pinned_rx.poll_recv(cx) {
Poll::Ready(Some(a)) => a,
Poll::Ready(None) => return Poll::Ready(Ok(0)),
Poll::Pending => {
return match self.recv.block {
true => Poll::Pending,
false => {
Poll::Ready(Err(Into::<io::Error>::into(io::ErrorKind::WouldBlock)))
}
}
}
};
rx.buffer.replace(Bytes::from(data));
}
}
fn poll_write_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
let tx = self.send.tx.lock().unwrap();
if tx.is_closed() {
Poll::Ready(Ok(0))
} else {
Poll::Ready(Ok(8192))
}
}
}
#[derive(Clone, Debug)]
pub struct DuplexPipe {
front: Pipe,
back: Pipe,
}
impl DuplexPipe {
pub fn front(&self) -> &Pipe {
&self.front
}
pub fn back(&self) -> &Pipe {
&self.back
}
pub fn front_mut(&mut self) -> &mut Pipe {
&mut self.front
}
pub fn back_mut(&mut self) -> &mut Pipe {
&mut self.back
}
pub fn split(self) -> (Pipe, Pipe) {
(self.front, self.back)
}
pub fn combine(front: Pipe, back: Pipe) -> Self {
Self { front, back }
}
pub fn reverse(self) -> Self {
let (front, back) = self.split();
Self::combine(back, front)
}
}
impl Default for DuplexPipe {
fn default() -> Self {
Self::new()
}
}
impl DuplexPipe {
pub fn new() -> DuplexPipe {
let (end1, end2) = Pipe::channel();
Self {
front: end1,
back: end2,
}
}
pub fn with_blocking(mut self, block: bool) -> Self {
self.set_blocking(block);
self
}
pub fn set_blocking(&mut self, block: bool) {
self.front.set_blocking(block);
self.back.set_blocking(block);
}
}
pub type WasiBidirectionalSharedPipePair = ArcFile<DuplexPipe>;