wasmtime_wasi/host/
io.rs

1use crate::{
2    bindings::sync::io::poll::Pollable,
3    bindings::sync::io::streams::{self, InputStream, OutputStream},
4    runtime::in_tokio,
5    IoImpl, IoView, StreamError, StreamResult,
6};
7use wasmtime::component::Resource;
8use wasmtime_wasi_io::bindings::wasi::io::streams::{
9    self as async_streams, Host as AsyncHost, HostInputStream as AsyncHostInputStream,
10    HostOutputStream as AsyncHostOutputStream,
11};
12
13impl From<async_streams::StreamError> for streams::StreamError {
14    fn from(other: async_streams::StreamError) -> Self {
15        match other {
16            async_streams::StreamError::LastOperationFailed(e) => Self::LastOperationFailed(e),
17            async_streams::StreamError::Closed => Self::Closed,
18        }
19    }
20}
21
22impl<T> streams::Host for IoImpl<T>
23where
24    T: IoView,
25{
26    fn convert_stream_error(&mut self, err: StreamError) -> anyhow::Result<streams::StreamError> {
27        Ok(AsyncHost::convert_stream_error(self, err)?.into())
28    }
29}
30
31impl<T> streams::HostOutputStream for IoImpl<T>
32where
33    T: IoView,
34{
35    fn drop(&mut self, stream: Resource<OutputStream>) -> anyhow::Result<()> {
36        in_tokio(async { AsyncHostOutputStream::drop(self, stream).await })
37    }
38
39    fn check_write(&mut self, stream: Resource<OutputStream>) -> StreamResult<u64> {
40        Ok(AsyncHostOutputStream::check_write(self, stream)?)
41    }
42
43    fn write(&mut self, stream: Resource<OutputStream>, bytes: Vec<u8>) -> StreamResult<()> {
44        Ok(AsyncHostOutputStream::write(self, stream, bytes)?)
45    }
46
47    fn blocking_write_and_flush(
48        &mut self,
49        stream: Resource<OutputStream>,
50        bytes: Vec<u8>,
51    ) -> StreamResult<()> {
52        in_tokio(async {
53            AsyncHostOutputStream::blocking_write_and_flush(self, stream, bytes).await
54        })
55    }
56
57    fn blocking_write_zeroes_and_flush(
58        &mut self,
59        stream: Resource<OutputStream>,
60        len: u64,
61    ) -> StreamResult<()> {
62        in_tokio(async {
63            AsyncHostOutputStream::blocking_write_zeroes_and_flush(self, stream, len).await
64        })
65    }
66
67    fn subscribe(&mut self, stream: Resource<OutputStream>) -> anyhow::Result<Resource<Pollable>> {
68        Ok(AsyncHostOutputStream::subscribe(self, stream)?)
69    }
70
71    fn write_zeroes(&mut self, stream: Resource<OutputStream>, len: u64) -> StreamResult<()> {
72        Ok(AsyncHostOutputStream::write_zeroes(self, stream, len)?)
73    }
74
75    fn flush(&mut self, stream: Resource<OutputStream>) -> StreamResult<()> {
76        Ok(AsyncHostOutputStream::flush(
77            self,
78            Resource::new_borrow(stream.rep()),
79        )?)
80    }
81
82    fn blocking_flush(&mut self, stream: Resource<OutputStream>) -> StreamResult<()> {
83        in_tokio(async {
84            AsyncHostOutputStream::blocking_flush(self, Resource::new_borrow(stream.rep())).await
85        })
86    }
87
88    fn splice(
89        &mut self,
90        dst: Resource<OutputStream>,
91        src: Resource<InputStream>,
92        len: u64,
93    ) -> StreamResult<u64> {
94        AsyncHostOutputStream::splice(self, dst, src, len)
95    }
96
97    fn blocking_splice(
98        &mut self,
99        dst: Resource<OutputStream>,
100        src: Resource<InputStream>,
101        len: u64,
102    ) -> StreamResult<u64> {
103        in_tokio(async { AsyncHostOutputStream::blocking_splice(self, dst, src, len).await })
104    }
105}
106
107impl<T> streams::HostInputStream for IoImpl<T>
108where
109    T: IoView,
110{
111    fn drop(&mut self, stream: Resource<InputStream>) -> anyhow::Result<()> {
112        in_tokio(async { AsyncHostInputStream::drop(self, stream).await })
113    }
114
115    fn read(&mut self, stream: Resource<InputStream>, len: u64) -> StreamResult<Vec<u8>> {
116        AsyncHostInputStream::read(self, stream, len)
117    }
118
119    fn blocking_read(&mut self, stream: Resource<InputStream>, len: u64) -> StreamResult<Vec<u8>> {
120        in_tokio(async { AsyncHostInputStream::blocking_read(self, stream, len).await })
121    }
122
123    fn skip(&mut self, stream: Resource<InputStream>, len: u64) -> StreamResult<u64> {
124        AsyncHostInputStream::skip(self, stream, len)
125    }
126
127    fn blocking_skip(&mut self, stream: Resource<InputStream>, len: u64) -> StreamResult<u64> {
128        in_tokio(async { AsyncHostInputStream::blocking_skip(self, stream, len).await })
129    }
130
131    fn subscribe(&mut self, stream: Resource<InputStream>) -> anyhow::Result<Resource<Pollable>> {
132        AsyncHostInputStream::subscribe(self, stream)
133    }
134}