1use crate::{
2 bindings::sync::io::poll::Pollable,
3 bindings::sync::io::streams::{self, InputStream, OutputStream},
4 runtime::in_tokio,
5 IoImpl, IoView, StreamError, StreamResult,
6};
7use wasmtime::component::Resource;
8use wasmtime_wasi_io::bindings::wasi::io::streams::{
9 self as async_streams, Host as AsyncHost, HostInputStream as AsyncHostInputStream,
10 HostOutputStream as AsyncHostOutputStream,
11};
12
13impl From<async_streams::StreamError> for streams::StreamError {
14 fn from(other: async_streams::StreamError) -> Self {
15 match other {
16 async_streams::StreamError::LastOperationFailed(e) => Self::LastOperationFailed(e),
17 async_streams::StreamError::Closed => Self::Closed,
18 }
19 }
20}
21
22impl<T> streams::Host for IoImpl<T>
23where
24 T: IoView,
25{
26 fn convert_stream_error(&mut self, err: StreamError) -> anyhow::Result<streams::StreamError> {
27 Ok(AsyncHost::convert_stream_error(self, err)?.into())
28 }
29}
30
31impl<T> streams::HostOutputStream for IoImpl<T>
32where
33 T: IoView,
34{
35 fn drop(&mut self, stream: Resource<OutputStream>) -> anyhow::Result<()> {
36 in_tokio(async { AsyncHostOutputStream::drop(self, stream).await })
37 }
38
39 fn check_write(&mut self, stream: Resource<OutputStream>) -> StreamResult<u64> {
40 Ok(AsyncHostOutputStream::check_write(self, stream)?)
41 }
42
43 fn write(&mut self, stream: Resource<OutputStream>, bytes: Vec<u8>) -> StreamResult<()> {
44 Ok(AsyncHostOutputStream::write(self, stream, bytes)?)
45 }
46
47 fn blocking_write_and_flush(
48 &mut self,
49 stream: Resource<OutputStream>,
50 bytes: Vec<u8>,
51 ) -> StreamResult<()> {
52 in_tokio(async {
53 AsyncHostOutputStream::blocking_write_and_flush(self, stream, bytes).await
54 })
55 }
56
57 fn blocking_write_zeroes_and_flush(
58 &mut self,
59 stream: Resource<OutputStream>,
60 len: u64,
61 ) -> StreamResult<()> {
62 in_tokio(async {
63 AsyncHostOutputStream::blocking_write_zeroes_and_flush(self, stream, len).await
64 })
65 }
66
67 fn subscribe(&mut self, stream: Resource<OutputStream>) -> anyhow::Result<Resource<Pollable>> {
68 Ok(AsyncHostOutputStream::subscribe(self, stream)?)
69 }
70
71 fn write_zeroes(&mut self, stream: Resource<OutputStream>, len: u64) -> StreamResult<()> {
72 Ok(AsyncHostOutputStream::write_zeroes(self, stream, len)?)
73 }
74
75 fn flush(&mut self, stream: Resource<OutputStream>) -> StreamResult<()> {
76 Ok(AsyncHostOutputStream::flush(
77 self,
78 Resource::new_borrow(stream.rep()),
79 )?)
80 }
81
82 fn blocking_flush(&mut self, stream: Resource<OutputStream>) -> StreamResult<()> {
83 in_tokio(async {
84 AsyncHostOutputStream::blocking_flush(self, Resource::new_borrow(stream.rep())).await
85 })
86 }
87
88 fn splice(
89 &mut self,
90 dst: Resource<OutputStream>,
91 src: Resource<InputStream>,
92 len: u64,
93 ) -> StreamResult<u64> {
94 AsyncHostOutputStream::splice(self, dst, src, len)
95 }
96
97 fn blocking_splice(
98 &mut self,
99 dst: Resource<OutputStream>,
100 src: Resource<InputStream>,
101 len: u64,
102 ) -> StreamResult<u64> {
103 in_tokio(async { AsyncHostOutputStream::blocking_splice(self, dst, src, len).await })
104 }
105}
106
107impl<T> streams::HostInputStream for IoImpl<T>
108where
109 T: IoView,
110{
111 fn drop(&mut self, stream: Resource<InputStream>) -> anyhow::Result<()> {
112 in_tokio(async { AsyncHostInputStream::drop(self, stream).await })
113 }
114
115 fn read(&mut self, stream: Resource<InputStream>, len: u64) -> StreamResult<Vec<u8>> {
116 AsyncHostInputStream::read(self, stream, len)
117 }
118
119 fn blocking_read(&mut self, stream: Resource<InputStream>, len: u64) -> StreamResult<Vec<u8>> {
120 in_tokio(async { AsyncHostInputStream::blocking_read(self, stream, len).await })
121 }
122
123 fn skip(&mut self, stream: Resource<InputStream>, len: u64) -> StreamResult<u64> {
124 AsyncHostInputStream::skip(self, stream, len)
125 }
126
127 fn blocking_skip(&mut self, stream: Resource<InputStream>, len: u64) -> StreamResult<u64> {
128 in_tokio(async { AsyncHostInputStream::blocking_skip(self, stream, len).await })
129 }
130
131 fn subscribe(&mut self, stream: Resource<InputStream>) -> anyhow::Result<Resource<Pollable>> {
132 AsyncHostInputStream::subscribe(self, stream)
133 }
134}