1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
use crate::preview2::{Table, TableError};
use anyhow::Error;
use std::any::Any;

/// An input bytestream.
///
/// This is "pseudo" because the real streams will be a type in wit, and
/// built into the wit bindings, and will support async and type parameters.
/// This pseudo-stream abstraction is synchronous and only supports bytes.
#[async_trait::async_trait]
pub trait InputStream: Send + Sync {
    fn as_any(&self) -> &dyn Any;

    /// If this stream is reading from a host file descriptor, return it so
    /// that it can be polled with a host poll.
    #[cfg(unix)]
    fn pollable_read(&self) -> Option<rustix::fd::BorrowedFd> {
        None
    }

    /// If this stream is reading from a host file descriptor, return it so
    /// that it can be polled with a host poll.
    #[cfg(windows)]
    fn pollable_read(&self) -> Option<io_extras::os::windows::BorrowedHandleOrSocket> {
        None
    }

    /// Read bytes. On success, returns a pair holding the number of bytes read
    /// and a flag indicating whether the end of the stream was reached.
    async fn read(&mut self, _buf: &mut [u8]) -> Result<(u64, bool), Error> {
        Err(anyhow::anyhow!("badf"))
    }

    /// Vectored-I/O form of `read`.
    async fn read_vectored<'a>(
        &mut self,
        _bufs: &mut [std::io::IoSliceMut<'a>],
    ) -> Result<(u64, bool), Error> {
        Err(anyhow::anyhow!("badf"))
    }

    /// Test whether vectored I/O reads are known to be optimized in the
    /// underlying implementation.
    fn is_read_vectored(&self) -> bool {
        false
    }

    /// Read bytes from a stream and discard them.
    async fn skip(&mut self, nelem: u64) -> Result<(u64, bool), Error> {
        let mut nread = 0;
        let mut saw_end = false;

        // TODO: Optimize by reading more than one byte at a time.
        for _ in 0..nelem {
            let (num, end) = self.read(&mut [0]).await?;
            nread += num;
            if end {
                saw_end = true;
                break;
            }
        }

        Ok((nread, saw_end))
    }

    /// Return the number of bytes that may be read without blocking.
    async fn num_ready_bytes(&self) -> Result<u64, Error> {
        Ok(0)
    }

    /// Test whether this stream is readable.
    async fn readable(&self) -> Result<(), Error>;
}

/// An output bytestream.
///
/// This is "pseudo" because the real streams will be a type in wit, and
/// built into the wit bindings, and will support async and type parameters.
/// This pseudo-stream abstraction is synchronous and only supports bytes.
#[async_trait::async_trait]
pub trait OutputStream: Send + Sync {
    fn as_any(&self) -> &dyn Any;

    /// If this stream is writing from a host file descriptor, return it so
    /// that it can be polled with a host poll.
    #[cfg(unix)]
    fn pollable_write(&self) -> Option<rustix::fd::BorrowedFd> {
        None
    }

    /// If this stream is writing from a host file descriptor, return it so
    /// that it can be polled with a host poll.
    #[cfg(windows)]
    fn pollable_write(&self) -> Option<io_extras::os::windows::BorrowedHandleOrSocket> {
        None
    }

    /// Write bytes. On success, returns the number of bytes written.
    async fn write(&mut self, _buf: &[u8]) -> Result<u64, Error> {
        Err(anyhow::anyhow!("badf"))
    }

    /// Vectored-I/O form of `write`.
    async fn write_vectored<'a>(&mut self, _bufs: &[std::io::IoSlice<'a>]) -> Result<u64, Error> {
        Err(anyhow::anyhow!("badf"))
    }

    /// Test whether vectored I/O writes are known to be optimized in the
    /// underlying implementation.
    fn is_write_vectored(&self) -> bool {
        false
    }

    /// Transfer bytes directly from an input stream to an output stream.
    async fn splice(
        &mut self,
        src: &mut dyn InputStream,
        nelem: u64,
    ) -> Result<(u64, bool), Error> {
        let mut nspliced = 0;
        let mut saw_end = false;

        // TODO: Optimize by splicing more than one byte at a time.
        for _ in 0..nelem {
            let mut buf = [0u8];
            let (num, end) = src.read(&mut buf).await?;
            self.write(&buf).await?;
            nspliced += num;
            if end {
                saw_end = true;
                break;
            }
        }

        Ok((nspliced, saw_end))
    }

    /// Repeatedly write a byte to a stream.
    async fn write_zeroes(&mut self, nelem: u64) -> Result<u64, Error> {
        let mut nwritten = 0;

        // TODO: Optimize by writing more than one byte at a time.
        for _ in 0..nelem {
            let num = self.write(&[0]).await?;
            if num == 0 {
                break;
            }
            nwritten += num;
        }

        Ok(nwritten)
    }

    /// Test whether this stream is writable.
    async fn writable(&self) -> Result<(), Error>;
}

pub trait TableStreamExt {
    fn push_input_stream(&mut self, istream: Box<dyn InputStream>) -> Result<u32, TableError>;
    fn get_input_stream(&self, fd: u32) -> Result<&dyn InputStream, TableError>;
    fn get_input_stream_mut(&mut self, fd: u32) -> Result<&mut Box<dyn InputStream>, TableError>;

    fn push_output_stream(&mut self, ostream: Box<dyn OutputStream>) -> Result<u32, TableError>;
    fn get_output_stream(&self, fd: u32) -> Result<&dyn OutputStream, TableError>;
    fn get_output_stream_mut(&mut self, fd: u32) -> Result<&mut Box<dyn OutputStream>, TableError>;
}
impl TableStreamExt for Table {
    fn push_input_stream(&mut self, istream: Box<dyn InputStream>) -> Result<u32, TableError> {
        self.push(Box::new(istream))
    }
    fn get_input_stream(&self, fd: u32) -> Result<&dyn InputStream, TableError> {
        self.get::<Box<dyn InputStream>>(fd).map(|f| f.as_ref())
    }
    fn get_input_stream_mut(&mut self, fd: u32) -> Result<&mut Box<dyn InputStream>, TableError> {
        self.get_mut::<Box<dyn InputStream>>(fd)
    }

    fn push_output_stream(&mut self, ostream: Box<dyn OutputStream>) -> Result<u32, TableError> {
        self.push(Box::new(ostream))
    }
    fn get_output_stream(&self, fd: u32) -> Result<&dyn OutputStream, TableError> {
        self.get::<Box<dyn OutputStream>>(fd).map(|f| f.as_ref())
    }
    fn get_output_stream_mut(&mut self, fd: u32) -> Result<&mut Box<dyn OutputStream>, TableError> {
        self.get_mut::<Box<dyn OutputStream>>(fd)
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use crate::preview2::pipe::{ReadPipe, WritePipe};
    #[test]
    fn input_stream_in_table() {
        let empty_pipe = ReadPipe::new(std::io::empty());
        let mut table = Table::new();
        let ix = table.push_input_stream(Box::new(empty_pipe)).unwrap();
        let _ = table.get_input_stream(ix).unwrap();
        let _ = table.get_input_stream_mut(ix).unwrap();
    }

    #[test]
    fn output_stream_in_table() {
        let dev_null = WritePipe::new(std::io::sink());
        let mut table = Table::new();
        let ix = table.push_output_stream(Box::new(dev_null)).unwrap();
        let _ = table.get_output_stream(ix).unwrap();
        let _ = table.get_output_stream_mut(ix).unwrap();
    }
}