1use crate::network::SocketAddrUse;
2use crate::{
3 bindings::{
4 sockets::network::{IpAddressFamily, IpSocketAddress, Network},
5 sockets::tcp::{self, ShutdownType},
6 },
7 network::SocketAddressFamily,
8};
9use crate::{SocketResult, WasiImpl, WasiView};
10use std::net::SocketAddr;
11use std::time::Duration;
12use wasmtime::component::Resource;
13use wasmtime_wasi_io::{
14 poll::DynPollable,
15 streams::{DynInputStream, DynOutputStream},
16 IoView,
17};
18
19impl<T> tcp::Host for WasiImpl<T> where T: WasiView {}
20
21impl<T> crate::host::tcp::tcp::HostTcpSocket for WasiImpl<T>
22where
23 T: WasiView,
24{
25 async fn start_bind(
26 &mut self,
27 this: Resource<tcp::TcpSocket>,
28 network: Resource<Network>,
29 local_address: IpSocketAddress,
30 ) -> SocketResult<()> {
31 self.ctx().allowed_network_uses.check_allowed_tcp()?;
32 let table = self.table();
33 let network = table.get(&network)?;
34 let local_address: SocketAddr = local_address.into();
35
36 network
38 .check_socket_addr(local_address, SocketAddrUse::TcpBind)
39 .await?;
40
41 table.get_mut(&this)?.start_bind(local_address)?;
43
44 Ok(())
45 }
46
47 fn finish_bind(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<()> {
48 let table = self.table();
49 let socket = table.get_mut(&this)?;
50
51 socket.finish_bind()
52 }
53
54 async fn start_connect(
55 &mut self,
56 this: Resource<tcp::TcpSocket>,
57 network: Resource<Network>,
58 remote_address: IpSocketAddress,
59 ) -> SocketResult<()> {
60 self.ctx().allowed_network_uses.check_allowed_tcp()?;
61 let table = self.table();
62 let network = table.get(&network)?;
63 let remote_address: SocketAddr = remote_address.into();
64
65 network
67 .check_socket_addr(remote_address, SocketAddrUse::TcpConnect)
68 .await?;
69
70 table.get_mut(&this)?.start_connect(remote_address)?;
72
73 Ok(())
74 }
75
76 fn finish_connect(
77 &mut self,
78 this: Resource<tcp::TcpSocket>,
79 ) -> SocketResult<(Resource<DynInputStream>, Resource<DynOutputStream>)> {
80 let table = self.table();
81 let socket = table.get_mut(&this)?;
82
83 let (input, output) = socket.finish_connect()?;
84
85 let input_stream = self.table().push_child(input, &this)?;
86 let output_stream = self.table().push_child(output, &this)?;
87
88 Ok((input_stream, output_stream))
89 }
90
91 fn start_listen(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<()> {
92 self.ctx().allowed_network_uses.check_allowed_tcp()?;
93 let table = self.table();
94 let socket = table.get_mut(&this)?;
95
96 socket.start_listen()
97 }
98
99 fn finish_listen(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<()> {
100 let table = self.table();
101 let socket = table.get_mut(&this)?;
102 socket.finish_listen()
103 }
104
105 fn accept(
106 &mut self,
107 this: Resource<tcp::TcpSocket>,
108 ) -> SocketResult<(
109 Resource<tcp::TcpSocket>,
110 Resource<DynInputStream>,
111 Resource<DynOutputStream>,
112 )> {
113 self.ctx().allowed_network_uses.check_allowed_tcp()?;
114 let table = self.table();
115 let socket = table.get_mut(&this)?;
116
117 let (tcp_socket, input, output) = socket.accept()?;
118
119 let tcp_socket = self.table().push(tcp_socket)?;
120 let input_stream = self.table().push_child(input, &tcp_socket)?;
121 let output_stream = self.table().push_child(output, &tcp_socket)?;
122
123 Ok((tcp_socket, input_stream, output_stream))
124 }
125
126 fn local_address(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<IpSocketAddress> {
127 let table = self.table();
128 let socket = table.get(&this)?;
129
130 socket.local_address().map(Into::into)
131 }
132
133 fn remote_address(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<IpSocketAddress> {
134 let table = self.table();
135 let socket = table.get(&this)?;
136
137 socket.remote_address().map(Into::into)
138 }
139
140 fn is_listening(&mut self, this: Resource<tcp::TcpSocket>) -> Result<bool, anyhow::Error> {
141 let table = self.table();
142 let socket = table.get(&this)?;
143
144 Ok(socket.is_listening())
145 }
146
147 fn address_family(
148 &mut self,
149 this: Resource<tcp::TcpSocket>,
150 ) -> Result<IpAddressFamily, anyhow::Error> {
151 let table = self.table();
152 let socket = table.get(&this)?;
153
154 match socket.address_family() {
155 SocketAddressFamily::Ipv4 => Ok(IpAddressFamily::Ipv4),
156 SocketAddressFamily::Ipv6 => Ok(IpAddressFamily::Ipv6),
157 }
158 }
159
160 fn set_listen_backlog_size(
161 &mut self,
162 this: Resource<tcp::TcpSocket>,
163 value: u64,
164 ) -> SocketResult<()> {
165 let table = self.table();
166 let socket = table.get_mut(&this)?;
167
168 let value = value.try_into().unwrap_or(u32::MAX);
170
171 socket.set_listen_backlog_size(value)
172 }
173
174 fn keep_alive_enabled(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<bool> {
175 let table = self.table();
176 let socket = table.get(&this)?;
177 socket.keep_alive_enabled()
178 }
179
180 fn set_keep_alive_enabled(
181 &mut self,
182 this: Resource<tcp::TcpSocket>,
183 value: bool,
184 ) -> SocketResult<()> {
185 let table = self.table();
186 let socket = table.get(&this)?;
187 socket.set_keep_alive_enabled(value)
188 }
189
190 fn keep_alive_idle_time(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u64> {
191 let table = self.table();
192 let socket = table.get(&this)?;
193 Ok(socket.keep_alive_idle_time()?.as_nanos() as u64)
194 }
195
196 fn set_keep_alive_idle_time(
197 &mut self,
198 this: Resource<tcp::TcpSocket>,
199 value: u64,
200 ) -> SocketResult<()> {
201 let table = self.table();
202 let socket = table.get_mut(&this)?;
203 let duration = Duration::from_nanos(value);
204 socket.set_keep_alive_idle_time(duration)
205 }
206
207 fn keep_alive_interval(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u64> {
208 let table = self.table();
209 let socket = table.get(&this)?;
210 Ok(socket.keep_alive_interval()?.as_nanos() as u64)
211 }
212
213 fn set_keep_alive_interval(
214 &mut self,
215 this: Resource<tcp::TcpSocket>,
216 value: u64,
217 ) -> SocketResult<()> {
218 let table = self.table();
219 let socket = table.get(&this)?;
220 socket.set_keep_alive_interval(Duration::from_nanos(value))
221 }
222
223 fn keep_alive_count(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u32> {
224 let table = self.table();
225 let socket = table.get(&this)?;
226 socket.keep_alive_count()
227 }
228
229 fn set_keep_alive_count(
230 &mut self,
231 this: Resource<tcp::TcpSocket>,
232 value: u32,
233 ) -> SocketResult<()> {
234 let table = self.table();
235 let socket = table.get(&this)?;
236 socket.set_keep_alive_count(value)
237 }
238
239 fn hop_limit(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u8> {
240 let table = self.table();
241 let socket = table.get(&this)?;
242 socket.hop_limit()
243 }
244
245 fn set_hop_limit(&mut self, this: Resource<tcp::TcpSocket>, value: u8) -> SocketResult<()> {
246 let table = self.table();
247 let socket = table.get_mut(&this)?;
248 socket.set_hop_limit(value)
249 }
250
251 fn receive_buffer_size(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u64> {
252 let table = self.table();
253 let socket = table.get(&this)?;
254
255 Ok(socket.receive_buffer_size()? as u64)
256 }
257
258 fn set_receive_buffer_size(
259 &mut self,
260 this: Resource<tcp::TcpSocket>,
261 value: u64,
262 ) -> SocketResult<()> {
263 let table = self.table();
264 let socket = table.get_mut(&this)?;
265 let value = value.try_into().unwrap_or(usize::MAX);
266 socket.set_receive_buffer_size(value)
267 }
268
269 fn send_buffer_size(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u64> {
270 let table = self.table();
271 let socket = table.get(&this)?;
272
273 Ok(socket.send_buffer_size()? as u64)
274 }
275
276 fn set_send_buffer_size(
277 &mut self,
278 this: Resource<tcp::TcpSocket>,
279 value: u64,
280 ) -> SocketResult<()> {
281 let table = self.table();
282 let socket = table.get_mut(&this)?;
283 let value = value.try_into().unwrap_or(usize::MAX);
284 socket.set_send_buffer_size(value)
285 }
286
287 fn subscribe(
288 &mut self,
289 this: Resource<tcp::TcpSocket>,
290 ) -> anyhow::Result<Resource<DynPollable>> {
291 wasmtime_wasi_io::poll::subscribe(self.table(), this)
292 }
293
294 fn shutdown(
295 &mut self,
296 this: Resource<tcp::TcpSocket>,
297 shutdown_type: ShutdownType,
298 ) -> SocketResult<()> {
299 let table = self.table();
300 let socket = table.get(&this)?;
301
302 let how = match shutdown_type {
303 ShutdownType::Receive => std::net::Shutdown::Read,
304 ShutdownType::Send => std::net::Shutdown::Write,
305 ShutdownType::Both => std::net::Shutdown::Both,
306 };
307 socket.shutdown(how)
308 }
309
310 fn drop(&mut self, this: Resource<tcp::TcpSocket>) -> Result<(), anyhow::Error> {
311 let table = self.table();
312
313 let dropped = table.delete(this)?;
316 drop(dropped);
317
318 Ok(())
319 }
320}
321
322pub mod sync {
323 use wasmtime::component::Resource;
324
325 use crate::{
326 bindings::{
327 sockets::{
328 network::Network,
329 tcp::{self as async_tcp, HostTcpSocket as AsyncHostTcpSocket},
330 },
331 sync::sockets::tcp::{
332 self, Duration, HostTcpSocket, InputStream, IpAddressFamily, IpSocketAddress,
333 OutputStream, Pollable, ShutdownType, TcpSocket,
334 },
335 },
336 runtime::in_tokio,
337 SocketError, WasiImpl, WasiView,
338 };
339
340 impl<T> tcp::Host for WasiImpl<T> where T: WasiView {}
341
342 impl<T> HostTcpSocket for WasiImpl<T>
343 where
344 T: WasiView,
345 {
346 fn start_bind(
347 &mut self,
348 self_: Resource<TcpSocket>,
349 network: Resource<Network>,
350 local_address: IpSocketAddress,
351 ) -> Result<(), SocketError> {
352 in_tokio(async {
353 AsyncHostTcpSocket::start_bind(self, self_, network, local_address).await
354 })
355 }
356
357 fn finish_bind(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
358 AsyncHostTcpSocket::finish_bind(self, self_)
359 }
360
361 fn start_connect(
362 &mut self,
363 self_: Resource<TcpSocket>,
364 network: Resource<Network>,
365 remote_address: IpSocketAddress,
366 ) -> Result<(), SocketError> {
367 in_tokio(async {
368 AsyncHostTcpSocket::start_connect(self, self_, network, remote_address).await
369 })
370 }
371
372 fn finish_connect(
373 &mut self,
374 self_: Resource<TcpSocket>,
375 ) -> Result<(Resource<InputStream>, Resource<OutputStream>), SocketError> {
376 AsyncHostTcpSocket::finish_connect(self, self_)
377 }
378
379 fn start_listen(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
380 AsyncHostTcpSocket::start_listen(self, self_)
381 }
382
383 fn finish_listen(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
384 AsyncHostTcpSocket::finish_listen(self, self_)
385 }
386
387 fn accept(
388 &mut self,
389 self_: Resource<TcpSocket>,
390 ) -> Result<
391 (
392 Resource<TcpSocket>,
393 Resource<InputStream>,
394 Resource<OutputStream>,
395 ),
396 SocketError,
397 > {
398 AsyncHostTcpSocket::accept(self, self_)
399 }
400
401 fn local_address(
402 &mut self,
403 self_: Resource<TcpSocket>,
404 ) -> Result<IpSocketAddress, SocketError> {
405 AsyncHostTcpSocket::local_address(self, self_)
406 }
407
408 fn remote_address(
409 &mut self,
410 self_: Resource<TcpSocket>,
411 ) -> Result<IpSocketAddress, SocketError> {
412 AsyncHostTcpSocket::remote_address(self, self_)
413 }
414
415 fn is_listening(&mut self, self_: Resource<TcpSocket>) -> wasmtime::Result<bool> {
416 AsyncHostTcpSocket::is_listening(self, self_)
417 }
418
419 fn address_family(
420 &mut self,
421 self_: Resource<TcpSocket>,
422 ) -> wasmtime::Result<IpAddressFamily> {
423 AsyncHostTcpSocket::address_family(self, self_)
424 }
425
426 fn set_listen_backlog_size(
427 &mut self,
428 self_: Resource<TcpSocket>,
429 value: u64,
430 ) -> Result<(), SocketError> {
431 AsyncHostTcpSocket::set_listen_backlog_size(self, self_, value)
432 }
433
434 fn keep_alive_enabled(&mut self, self_: Resource<TcpSocket>) -> Result<bool, SocketError> {
435 AsyncHostTcpSocket::keep_alive_enabled(self, self_)
436 }
437
438 fn set_keep_alive_enabled(
439 &mut self,
440 self_: Resource<TcpSocket>,
441 value: bool,
442 ) -> Result<(), SocketError> {
443 AsyncHostTcpSocket::set_keep_alive_enabled(self, self_, value)
444 }
445
446 fn keep_alive_idle_time(
447 &mut self,
448 self_: Resource<TcpSocket>,
449 ) -> Result<Duration, SocketError> {
450 AsyncHostTcpSocket::keep_alive_idle_time(self, self_)
451 }
452
453 fn set_keep_alive_idle_time(
454 &mut self,
455 self_: Resource<TcpSocket>,
456 value: Duration,
457 ) -> Result<(), SocketError> {
458 AsyncHostTcpSocket::set_keep_alive_idle_time(self, self_, value)
459 }
460
461 fn keep_alive_interval(
462 &mut self,
463 self_: Resource<TcpSocket>,
464 ) -> Result<Duration, SocketError> {
465 AsyncHostTcpSocket::keep_alive_interval(self, self_)
466 }
467
468 fn set_keep_alive_interval(
469 &mut self,
470 self_: Resource<TcpSocket>,
471 value: Duration,
472 ) -> Result<(), SocketError> {
473 AsyncHostTcpSocket::set_keep_alive_interval(self, self_, value)
474 }
475
476 fn keep_alive_count(&mut self, self_: Resource<TcpSocket>) -> Result<u32, SocketError> {
477 AsyncHostTcpSocket::keep_alive_count(self, self_)
478 }
479
480 fn set_keep_alive_count(
481 &mut self,
482 self_: Resource<TcpSocket>,
483 value: u32,
484 ) -> Result<(), SocketError> {
485 AsyncHostTcpSocket::set_keep_alive_count(self, self_, value)
486 }
487
488 fn hop_limit(&mut self, self_: Resource<TcpSocket>) -> Result<u8, SocketError> {
489 AsyncHostTcpSocket::hop_limit(self, self_)
490 }
491
492 fn set_hop_limit(
493 &mut self,
494 self_: Resource<TcpSocket>,
495 value: u8,
496 ) -> Result<(), SocketError> {
497 AsyncHostTcpSocket::set_hop_limit(self, self_, value)
498 }
499
500 fn receive_buffer_size(&mut self, self_: Resource<TcpSocket>) -> Result<u64, SocketError> {
501 AsyncHostTcpSocket::receive_buffer_size(self, self_)
502 }
503
504 fn set_receive_buffer_size(
505 &mut self,
506 self_: Resource<TcpSocket>,
507 value: u64,
508 ) -> Result<(), SocketError> {
509 AsyncHostTcpSocket::set_receive_buffer_size(self, self_, value)
510 }
511
512 fn send_buffer_size(&mut self, self_: Resource<TcpSocket>) -> Result<u64, SocketError> {
513 AsyncHostTcpSocket::send_buffer_size(self, self_)
514 }
515
516 fn set_send_buffer_size(
517 &mut self,
518 self_: Resource<TcpSocket>,
519 value: u64,
520 ) -> Result<(), SocketError> {
521 AsyncHostTcpSocket::set_send_buffer_size(self, self_, value)
522 }
523
524 fn subscribe(
525 &mut self,
526 self_: Resource<TcpSocket>,
527 ) -> wasmtime::Result<Resource<Pollable>> {
528 AsyncHostTcpSocket::subscribe(self, self_)
529 }
530
531 fn shutdown(
532 &mut self,
533 self_: Resource<TcpSocket>,
534 shutdown_type: ShutdownType,
535 ) -> Result<(), SocketError> {
536 AsyncHostTcpSocket::shutdown(self, self_, shutdown_type.into())
537 }
538
539 fn drop(&mut self, rep: Resource<TcpSocket>) -> wasmtime::Result<()> {
540 AsyncHostTcpSocket::drop(self, rep)
541 }
542 }
543
544 impl From<ShutdownType> for async_tcp::ShutdownType {
545 fn from(other: ShutdownType) -> Self {
546 match other {
547 ShutdownType::Receive => async_tcp::ShutdownType::Receive,
548 ShutdownType::Send => async_tcp::ShutdownType::Send,
549 ShutdownType::Both => async_tcp::ShutdownType::Both,
550 }
551 }
552 }
553}