1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
36
37use futures::{future::Ready, prelude::*, ready, stream::SelectAll};
38use libp2p_core::{
39 connection::Endpoint,
40 transport::{ListenerId, TransportError, TransportEvent},
41 Multiaddr, Transport,
42};
43use send_wrapper::SendWrapper;
44use std::{collections::VecDeque, error, fmt, io, mem, pin::Pin, task::Context, task::Poll};
45use wasm_bindgen::{prelude::*, JsCast};
46use wasm_bindgen_futures::JsFuture;
47
48pub mod ffi {
50 use wasm_bindgen::prelude::*;
51
52 #[wasm_bindgen]
53 extern "C" {
54 pub type Transport;
56 pub type Connection;
58 pub type ListenEvent;
60 pub type ConnectionEvent;
62
63 #[wasm_bindgen(method, catch)]
70 pub fn dial(
71 this: &Transport,
72 multiaddr: &str,
73 _role_override: bool,
74 ) -> Result<js_sys::Promise, JsValue>;
75
76 #[wasm_bindgen(method, catch)]
83 pub fn listen_on(this: &Transport, multiaddr: &str) -> Result<js_sys::Iterator, JsValue>;
84
85 #[wasm_bindgen(method, getter)]
90 pub fn read(this: &Connection) -> js_sys::Iterator;
91
92 #[wasm_bindgen(method, catch)]
100 pub fn write(this: &Connection, data: &[u8]) -> Result<js_sys::Promise, JsValue>;
101
102 #[wasm_bindgen(method, catch)]
105 pub fn shutdown(this: &Connection) -> Result<(), JsValue>;
106
107 #[wasm_bindgen(method)]
109 pub fn close(this: &Connection);
110
111 #[wasm_bindgen(method, getter)]
114 pub fn new_addrs(this: &ListenEvent) -> Option<Box<[JsValue]>>;
115
116 #[wasm_bindgen(method, getter)]
118 pub fn expired_addrs(this: &ListenEvent) -> Option<Box<[JsValue]>>;
119
120 #[wasm_bindgen(method, getter)]
122 pub fn new_connections(this: &ListenEvent) -> Option<Box<[JsValue]>>;
123
124 #[wasm_bindgen(method, getter)]
126 pub fn next_event(this: &ListenEvent) -> JsValue;
127
128 #[wasm_bindgen(method, getter)]
130 pub fn connection(this: &ConnectionEvent) -> Connection;
131
132 #[wasm_bindgen(method, getter)]
134 pub fn observed_addr(this: &ConnectionEvent) -> String;
135
136 #[wasm_bindgen(method, getter)]
138 pub fn local_addr(this: &ConnectionEvent) -> String;
139 }
140
141 #[cfg(feature = "websocket")]
142 #[wasm_bindgen(module = "/src/websockets.js")]
143 extern "C" {
144 pub fn websocket_transport() -> Transport;
146 }
147}
148
149pub struct ExtTransport {
151 inner: SendWrapper<ffi::Transport>,
152 listeners: SelectAll<Listen>,
153}
154
155impl ExtTransport {
156 pub fn new(transport: ffi::Transport) -> Self {
158 ExtTransport {
159 inner: SendWrapper::new(transport),
160 listeners: SelectAll::new(),
161 }
162 }
163
164 fn do_dial(
165 &mut self,
166 addr: Multiaddr,
167 role_override: Endpoint,
168 ) -> Result<<Self as Transport>::Dial, TransportError<<Self as Transport>::Error>> {
169 let promise = self
170 .inner
171 .dial(
172 &addr.to_string(),
173 matches!(role_override, Endpoint::Listener),
174 )
175 .map_err(|err| {
176 if is_not_supported_error(&err) {
177 TransportError::MultiaddrNotSupported(addr)
178 } else {
179 TransportError::Other(JsErr::from(err))
180 }
181 })?;
182
183 Ok(Dial {
184 inner: SendWrapper::new(promise.into()),
185 })
186 }
187}
188
189impl fmt::Debug for ExtTransport {
190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191 f.debug_tuple("ExtTransport").finish()
192 }
193}
194
195impl Transport for ExtTransport {
196 type Output = Connection;
197 type Error = JsErr;
198 type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
199 type Dial = Dial;
200
201 fn listen_on(
202 &mut self,
203 listener_id: ListenerId,
204 addr: Multiaddr,
205 ) -> Result<(), TransportError<Self::Error>> {
206 let iter = self.inner.listen_on(&addr.to_string()).map_err(|err| {
207 if is_not_supported_error(&err) {
208 TransportError::MultiaddrNotSupported(addr)
209 } else {
210 TransportError::Other(JsErr::from(err))
211 }
212 })?;
213 let listen = Listen {
214 listener_id,
215 iterator: SendWrapper::new(iter),
216 next_event: None,
217 pending_events: VecDeque::new(),
218 is_closed: false,
219 };
220 self.listeners.push(listen);
221 Ok(())
222 }
223
224 fn remove_listener(&mut self, id: ListenerId) -> bool {
225 match self.listeners.iter_mut().find(|l| l.listener_id == id) {
226 Some(listener) => {
227 listener.close(Ok(()));
228 true
229 }
230 None => false,
231 }
232 }
233
234 fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
235 self.do_dial(addr, Endpoint::Dialer)
236 }
237
238 fn dial_as_listener(
239 &mut self,
240 addr: Multiaddr,
241 ) -> Result<Self::Dial, TransportError<Self::Error>> {
242 self.do_dial(addr, Endpoint::Listener)
243 }
244
245 fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
246 None
247 }
248
249 fn poll(
250 mut self: Pin<&mut Self>,
251 cx: &mut Context<'_>,
252 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
253 match ready!(self.listeners.poll_next_unpin(cx)) {
254 Some(event) => Poll::Ready(event),
255 None => Poll::Pending,
256 }
257 }
258}
259
260#[must_use = "futures do nothing unless polled"]
262pub struct Dial {
263 inner: SendWrapper<JsFuture>,
265}
266
267impl fmt::Debug for Dial {
268 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269 f.debug_tuple("Dial").finish()
270 }
271}
272
273impl Future for Dial {
274 type Output = Result<Connection, JsErr>;
275
276 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
277 match Future::poll(Pin::new(&mut *self.inner), cx) {
278 Poll::Ready(Ok(connec)) => Poll::Ready(Ok(Connection::new(connec.into()))),
279 Poll::Pending => Poll::Pending,
280 Poll::Ready(Err(err)) => Poll::Ready(Err(JsErr::from(err))),
281 }
282 }
283}
284
285#[must_use = "futures do nothing unless polled"]
287pub struct Listen {
288 listener_id: ListenerId,
289 iterator: SendWrapper<js_sys::Iterator>,
291 next_event: Option<SendWrapper<JsFuture>>,
293 pending_events: VecDeque<<Self as Stream>::Item>,
295 is_closed: bool,
297}
298
299impl Listen {
300 fn close(&mut self, reason: Result<(), JsErr>) {
302 self.pending_events
303 .push_back(TransportEvent::ListenerClosed {
304 listener_id: self.listener_id,
305 reason,
306 });
307 self.is_closed = true;
308 }
309}
310
311impl fmt::Debug for Listen {
312 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313 f.debug_tuple("Listen").field(&self.listener_id).finish()
314 }
315}
316
317impl Stream for Listen {
318 type Item = TransportEvent<<ExtTransport as Transport>::ListenerUpgrade, JsErr>;
319
320 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
321 loop {
322 if let Some(ev) = self.pending_events.pop_front() {
323 return Poll::Ready(Some(ev));
324 }
325
326 if self.is_closed {
327 return Poll::Ready(None);
329 }
330
331 if self.next_event.is_none() {
334 if let Ok(ev) = self.iterator.next() {
335 if !ev.done() {
336 let promise: js_sys::Promise = ev.value().into();
337 self.next_event = Some(SendWrapper::new(promise.into()));
338 }
339 }
340 }
341
342 let event = if let Some(next_event) = self.next_event.as_mut() {
343 let e = match Future::poll(Pin::new(&mut **next_event), cx) {
344 Poll::Ready(Ok(ev)) => ffi::ListenEvent::from(ev),
345 Poll::Pending => return Poll::Pending,
346 Poll::Ready(Err(err)) => {
347 self.close(Err(err.into()));
348 continue;
349 }
350 };
351 self.next_event = None;
352 e
353 } else {
354 self.close(Ok(()));
355 continue;
356 };
357
358 let listener_id = self.listener_id;
359
360 if let Some(addrs) = event.new_addrs() {
361 for addr in addrs.iter() {
362 match js_value_to_addr(addr) {
363 Ok(addr) => self.pending_events.push_back(TransportEvent::NewAddress {
364 listener_id,
365 listen_addr: addr,
366 }),
367 Err(err) => self
368 .pending_events
369 .push_back(TransportEvent::ListenerError {
370 listener_id,
371 error: err,
372 }),
373 };
374 }
375 }
376
377 if let Some(upgrades) = event.new_connections() {
378 for upgrade in upgrades.iter().cloned() {
379 let upgrade: ffi::ConnectionEvent = upgrade.into();
380 match upgrade.local_addr().parse().and_then(|local| {
381 let observed = upgrade.observed_addr().parse()?;
382 Ok((local, observed))
383 }) {
384 Ok((local_addr, send_back_addr)) => {
385 self.pending_events.push_back(TransportEvent::Incoming {
386 listener_id,
387 local_addr,
388 send_back_addr,
389 upgrade: futures::future::ok(Connection::new(upgrade.connection())),
390 })
391 }
392 Err(err) => self
393 .pending_events
394 .push_back(TransportEvent::ListenerError {
395 listener_id,
396 error: err.into(),
397 }),
398 }
399 }
400 }
401
402 if let Some(addrs) = event.expired_addrs() {
403 for addr in addrs.iter() {
404 match js_value_to_addr(addr) {
405 Ok(addr) => self
406 .pending_events
407 .push_back(TransportEvent::AddressExpired {
408 listener_id,
409 listen_addr: addr,
410 }),
411 Err(err) => self
412 .pending_events
413 .push_back(TransportEvent::ListenerError {
414 listener_id,
415 error: err,
416 }),
417 }
418 }
419 }
420 }
421 }
422}
423
424pub struct Connection {
430 inner: SendWrapper<ffi::Connection>,
432
433 read_iterator: SendWrapper<js_sys::Iterator>,
435
436 read_state: ConnectionReadState,
438
439 previous_write_promise: Option<SendWrapper<JsFuture>>,
443}
444
445impl Connection {
446 fn new(inner: ffi::Connection) -> Self {
448 let read_iterator = inner.read();
449
450 Connection {
451 inner: SendWrapper::new(inner),
452 read_iterator: SendWrapper::new(read_iterator),
453 read_state: ConnectionReadState::PendingData(Vec::new()),
454 previous_write_promise: None,
455 }
456 }
457}
458
459enum ConnectionReadState {
461 PendingData(Vec<u8>),
463 Waiting(SendWrapper<JsFuture>),
465 Finished,
467}
468
469impl fmt::Debug for Connection {
470 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
471 f.debug_tuple("Connection").finish()
472 }
473}
474
475impl AsyncRead for Connection {
476 fn poll_read(
477 mut self: Pin<&mut Self>,
478 cx: &mut Context<'_>,
479 buf: &mut [u8],
480 ) -> Poll<Result<usize, io::Error>> {
481 loop {
482 match mem::replace(&mut self.read_state, ConnectionReadState::Finished) {
483 ConnectionReadState::Finished => {
484 break Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
485 }
486
487 ConnectionReadState::PendingData(ref data) if data.is_empty() => {
488 let iter_next = self.read_iterator.next().map_err(JsErr::from)?;
489 if iter_next.done() {
490 self.read_state = ConnectionReadState::Finished;
491 } else {
492 let promise: js_sys::Promise = iter_next.value().into();
493 let promise = SendWrapper::new(promise.into());
494 self.read_state = ConnectionReadState::Waiting(promise);
495 }
496 continue;
497 }
498
499 ConnectionReadState::PendingData(mut data) => {
500 debug_assert!(!data.is_empty());
501 if buf.len() <= data.len() {
502 buf.copy_from_slice(&data[..buf.len()]);
503 self.read_state =
504 ConnectionReadState::PendingData(data.split_off(buf.len()));
505 break Poll::Ready(Ok(buf.len()));
506 } else {
507 let len = data.len();
508 buf[..len].copy_from_slice(&data);
509 self.read_state = ConnectionReadState::PendingData(Vec::new());
510 break Poll::Ready(Ok(len));
511 }
512 }
513
514 ConnectionReadState::Waiting(mut promise) => {
515 let data = match Future::poll(Pin::new(&mut *promise), cx) {
516 Poll::Ready(Ok(ref data)) if data.is_null() => break Poll::Ready(Ok(0)),
517 Poll::Ready(Ok(data)) => data,
518 Poll::Ready(Err(err)) => {
519 break Poll::Ready(Err(io::Error::from(JsErr::from(err))))
520 }
521 Poll::Pending => {
522 self.read_state = ConnectionReadState::Waiting(promise);
523 break Poll::Pending;
524 }
525 };
526
527 let data = js_sys::Uint8Array::new(&data);
530 let data_len = data.length() as usize;
531 if data_len <= buf.len() {
532 data.copy_to(&mut buf[..data_len]);
533 self.read_state = ConnectionReadState::PendingData(Vec::new());
534 break Poll::Ready(Ok(data_len));
535 } else {
536 let mut tmp_buf = vec![0; data_len];
537 data.copy_to(&mut tmp_buf[..]);
538 self.read_state = ConnectionReadState::PendingData(tmp_buf);
539 continue;
540 }
541 }
542 }
543 }
544 }
545}
546
547impl AsyncWrite for Connection {
548 fn poll_write(
549 mut self: Pin<&mut Self>,
550 cx: &mut Context<'_>,
551 buf: &[u8],
552 ) -> Poll<Result<usize, io::Error>> {
553 if let Some(mut promise) = self.previous_write_promise.take() {
557 match Future::poll(Pin::new(&mut *promise), cx) {
558 Poll::Ready(Ok(_)) => (),
559 Poll::Ready(Err(err)) => {
560 return Poll::Ready(Err(io::Error::from(JsErr::from(err))))
561 }
562 Poll::Pending => {
563 self.previous_write_promise = Some(promise);
564 return Poll::Pending;
565 }
566 }
567 }
568
569 debug_assert!(self.previous_write_promise.is_none());
570 self.previous_write_promise = Some(SendWrapper::new(
571 self.inner.write(buf).map_err(JsErr::from)?.into(),
572 ));
573 Poll::Ready(Ok(buf.len()))
574 }
575
576 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
577 Poll::Ready(Ok(()))
579 }
580
581 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
582 match self.inner.shutdown() {
584 Ok(()) => Poll::Ready(Ok(())),
585 Err(err) => Poll::Ready(Err(io::Error::from(JsErr::from(err)))),
586 }
587 }
588}
589
590impl Drop for Connection {
591 fn drop(&mut self) {
592 self.inner.close();
593 }
594}
595
596fn is_not_supported_error(err: &JsValue) -> bool {
598 if let Some(err) = err.dyn_ref::<js_sys::Error>() {
599 err.name() == "NotSupportedError"
600 } else {
601 false
602 }
603}
604
605fn js_value_to_addr(addr: &JsValue) -> Result<Multiaddr, JsErr> {
607 if let Some(addr) = addr.as_string() {
608 Ok(addr.parse()?)
609 } else {
610 Err(JsValue::from_str("Element in new_addrs is not a string").into())
611 }
612}
613
614pub struct JsErr(SendWrapper<JsValue>);
616
617impl From<JsValue> for JsErr {
618 fn from(val: JsValue) -> JsErr {
619 JsErr(SendWrapper::new(val))
620 }
621}
622
623impl From<libp2p_core::multiaddr::Error> for JsErr {
624 fn from(err: libp2p_core::multiaddr::Error) -> JsErr {
625 JsValue::from_str(&err.to_string()).into()
626 }
627}
628
629impl From<JsErr> for io::Error {
630 fn from(err: JsErr) -> io::Error {
631 io::Error::new(io::ErrorKind::Other, err.to_string())
632 }
633}
634
635impl fmt::Debug for JsErr {
636 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
637 write!(f, "{self}")
638 }
639}
640
641impl fmt::Display for JsErr {
642 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
643 if let Some(s) = self.0.as_string() {
644 write!(f, "{s}")
645 } else if let Some(err) = self.0.dyn_ref::<js_sys::Error>() {
646 write!(f, "{}", String::from(err.message()))
647 } else if let Some(obj) = self.0.dyn_ref::<js_sys::Object>() {
648 write!(f, "{}", String::from(obj.to_string()))
649 } else {
650 write!(f, "{:?}", &*self.0)
651 }
652 }
653}
654
655impl error::Error for JsErr {}