libp2p_wasm_ext/
lib.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the libp2p `Transport` trait for external transports.
22//!
23//! This `Transport` is used in the context of WASM to allow delegating the transport mechanism
24//! to the code that uses rust-libp2p, as opposed to inside of rust-libp2p itself.
25//!
26//! > **Note**: This only allows transports that produce a raw stream with the remote. You
27//! >           couldn't, for example, pass an implementation QUIC.
28//!
29//! # Usage
30//!
31//! Call `new()` with a JavaScript object that implements the interface described in the `ffi`
32//! module.
33//!
34
35#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
36
37use futures::{future::Ready, prelude::*, ready, stream::SelectAll};
38use libp2p_core::{
39    connection::Endpoint,
40    transport::{ListenerId, TransportError, TransportEvent},
41    Multiaddr, Transport,
42};
43use send_wrapper::SendWrapper;
44use std::{collections::VecDeque, error, fmt, io, mem, pin::Pin, task::Context, task::Poll};
45use wasm_bindgen::{prelude::*, JsCast};
46use wasm_bindgen_futures::JsFuture;
47
48/// Contains the definition that one must match on the JavaScript side.
49pub mod ffi {
50    use wasm_bindgen::prelude::*;
51
52    #[wasm_bindgen]
53    extern "C" {
54        /// Type of the object that allows opening connections.
55        pub type Transport;
56        /// Type of the object that represents an open connection with a remote.
57        pub type Connection;
58        /// Type of the object that represents an event generated by listening.
59        pub type ListenEvent;
60        /// Type of the object that represents an event containing a new connection with a remote.
61        pub type ConnectionEvent;
62
63        /// Start attempting to dial the given multiaddress.
64        ///
65        /// The returned `Promise` must yield a [`Connection`] on success.
66        ///
67        /// If the multiaddress is not supported, you should return an instance of `Error` whose
68        /// `name` property has been set to the string `"NotSupportedError"`.
69        #[wasm_bindgen(method, catch)]
70        pub fn dial(
71            this: &Transport,
72            multiaddr: &str,
73            _role_override: bool,
74        ) -> Result<js_sys::Promise, JsValue>;
75
76        /// Start listening on the given multiaddress.
77        ///
78        /// The returned `Iterator` must yield `Promise`s to [`ListenEvent`] events.
79        ///
80        /// If the multiaddress is not supported, you should return an instance of `Error` whose
81        /// `name` property has been set to the string `"NotSupportedError"`.
82        #[wasm_bindgen(method, catch)]
83        pub fn listen_on(this: &Transport, multiaddr: &str) -> Result<js_sys::Iterator, JsValue>;
84
85        /// Returns an iterator of JavaScript `Promise`s that resolve to `ArrayBuffer` objects
86        /// (or resolve to null, see below). These `ArrayBuffer` objects contain the data that the
87        /// remote has sent to us. If the remote closes the connection, the iterator must produce
88        /// a `Promise` that resolves to `null`.
89        #[wasm_bindgen(method, getter)]
90        pub fn read(this: &Connection) -> js_sys::Iterator;
91
92        /// Writes data to the connection. Returns a `Promise` that resolves when the connection is
93        /// ready for writing again.
94        ///
95        /// If the `Promise` produces an error, the writing side of the connection is considered
96        /// unrecoverable and the connection should be closed as soon as possible.
97        ///
98        /// Guaranteed to only be called after the previous write promise has resolved.
99        #[wasm_bindgen(method, catch)]
100        pub fn write(this: &Connection, data: &[u8]) -> Result<js_sys::Promise, JsValue>;
101
102        /// Shuts down the writing side of the connection. After this has been called, the `write`
103        /// method will no longer be called.
104        #[wasm_bindgen(method, catch)]
105        pub fn shutdown(this: &Connection) -> Result<(), JsValue>;
106
107        /// Closes the connection. No other method will be called on this connection anymore.
108        #[wasm_bindgen(method)]
109        pub fn close(this: &Connection);
110
111        /// List of addresses we have started listening on. Must be an array of strings of
112        /// multiaddrs.
113        #[wasm_bindgen(method, getter)]
114        pub fn new_addrs(this: &ListenEvent) -> Option<Box<[JsValue]>>;
115
116        /// List of addresses that have expired. Must be an array of strings of multiaddrs.
117        #[wasm_bindgen(method, getter)]
118        pub fn expired_addrs(this: &ListenEvent) -> Option<Box<[JsValue]>>;
119
120        /// List of [`ConnectionEvent`] object that has been received.
121        #[wasm_bindgen(method, getter)]
122        pub fn new_connections(this: &ListenEvent) -> Option<Box<[JsValue]>>;
123
124        /// Promise to the next event that the listener will generate.
125        #[wasm_bindgen(method, getter)]
126        pub fn next_event(this: &ListenEvent) -> JsValue;
127
128        /// The [`Connection`] object for communication with the remote.
129        #[wasm_bindgen(method, getter)]
130        pub fn connection(this: &ConnectionEvent) -> Connection;
131
132        /// The address we observe for the remote connection.
133        #[wasm_bindgen(method, getter)]
134        pub fn observed_addr(this: &ConnectionEvent) -> String;
135
136        /// The address we are listening on, that received the remote connection.
137        #[wasm_bindgen(method, getter)]
138        pub fn local_addr(this: &ConnectionEvent) -> String;
139    }
140
141    #[cfg(feature = "websocket")]
142    #[wasm_bindgen(module = "/src/websockets.js")]
143    extern "C" {
144        /// Returns a `Transport` implemented using websockets.
145        pub fn websocket_transport() -> Transport;
146    }
147}
148
149/// Implementation of `Transport` whose implementation is handled by some FFI.
150pub struct ExtTransport {
151    inner: SendWrapper<ffi::Transport>,
152    listeners: SelectAll<Listen>,
153}
154
155impl ExtTransport {
156    /// Creates a new `ExtTransport` that uses the given external `Transport`.
157    pub fn new(transport: ffi::Transport) -> Self {
158        ExtTransport {
159            inner: SendWrapper::new(transport),
160            listeners: SelectAll::new(),
161        }
162    }
163
164    fn do_dial(
165        &mut self,
166        addr: Multiaddr,
167        role_override: Endpoint,
168    ) -> Result<<Self as Transport>::Dial, TransportError<<Self as Transport>::Error>> {
169        let promise = self
170            .inner
171            .dial(
172                &addr.to_string(),
173                matches!(role_override, Endpoint::Listener),
174            )
175            .map_err(|err| {
176                if is_not_supported_error(&err) {
177                    TransportError::MultiaddrNotSupported(addr)
178                } else {
179                    TransportError::Other(JsErr::from(err))
180                }
181            })?;
182
183        Ok(Dial {
184            inner: SendWrapper::new(promise.into()),
185        })
186    }
187}
188
189impl fmt::Debug for ExtTransport {
190    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191        f.debug_tuple("ExtTransport").finish()
192    }
193}
194
195impl Transport for ExtTransport {
196    type Output = Connection;
197    type Error = JsErr;
198    type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
199    type Dial = Dial;
200
201    fn listen_on(
202        &mut self,
203        listener_id: ListenerId,
204        addr: Multiaddr,
205    ) -> Result<(), TransportError<Self::Error>> {
206        let iter = self.inner.listen_on(&addr.to_string()).map_err(|err| {
207            if is_not_supported_error(&err) {
208                TransportError::MultiaddrNotSupported(addr)
209            } else {
210                TransportError::Other(JsErr::from(err))
211            }
212        })?;
213        let listen = Listen {
214            listener_id,
215            iterator: SendWrapper::new(iter),
216            next_event: None,
217            pending_events: VecDeque::new(),
218            is_closed: false,
219        };
220        self.listeners.push(listen);
221        Ok(())
222    }
223
224    fn remove_listener(&mut self, id: ListenerId) -> bool {
225        match self.listeners.iter_mut().find(|l| l.listener_id == id) {
226            Some(listener) => {
227                listener.close(Ok(()));
228                true
229            }
230            None => false,
231        }
232    }
233
234    fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
235        self.do_dial(addr, Endpoint::Dialer)
236    }
237
238    fn dial_as_listener(
239        &mut self,
240        addr: Multiaddr,
241    ) -> Result<Self::Dial, TransportError<Self::Error>> {
242        self.do_dial(addr, Endpoint::Listener)
243    }
244
245    fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
246        None
247    }
248
249    fn poll(
250        mut self: Pin<&mut Self>,
251        cx: &mut Context<'_>,
252    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
253        match ready!(self.listeners.poll_next_unpin(cx)) {
254            Some(event) => Poll::Ready(event),
255            None => Poll::Pending,
256        }
257    }
258}
259
260/// Future that dial a remote through an external transport.
261#[must_use = "futures do nothing unless polled"]
262pub struct Dial {
263    /// A promise that will resolve to a `ffi::Connection` on success.
264    inner: SendWrapper<JsFuture>,
265}
266
267impl fmt::Debug for Dial {
268    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269        f.debug_tuple("Dial").finish()
270    }
271}
272
273impl Future for Dial {
274    type Output = Result<Connection, JsErr>;
275
276    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
277        match Future::poll(Pin::new(&mut *self.inner), cx) {
278            Poll::Ready(Ok(connec)) => Poll::Ready(Ok(Connection::new(connec.into()))),
279            Poll::Pending => Poll::Pending,
280            Poll::Ready(Err(err)) => Poll::Ready(Err(JsErr::from(err))),
281        }
282    }
283}
284
285/// Stream that listens for incoming connections through an external transport.
286#[must_use = "futures do nothing unless polled"]
287pub struct Listen {
288    listener_id: ListenerId,
289    /// Iterator of `ListenEvent`s.
290    iterator: SendWrapper<js_sys::Iterator>,
291    /// Promise that will yield the next `ListenEvent`.
292    next_event: Option<SendWrapper<JsFuture>>,
293    /// List of events that we are waiting to propagate.
294    pending_events: VecDeque<<Self as Stream>::Item>,
295    /// If the iterator is done close the listener.
296    is_closed: bool,
297}
298
299impl Listen {
300    /// Report the listener as closed and terminate its stream.
301    fn close(&mut self, reason: Result<(), JsErr>) {
302        self.pending_events
303            .push_back(TransportEvent::ListenerClosed {
304                listener_id: self.listener_id,
305                reason,
306            });
307        self.is_closed = true;
308    }
309}
310
311impl fmt::Debug for Listen {
312    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313        f.debug_tuple("Listen").field(&self.listener_id).finish()
314    }
315}
316
317impl Stream for Listen {
318    type Item = TransportEvent<<ExtTransport as Transport>::ListenerUpgrade, JsErr>;
319
320    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
321        loop {
322            if let Some(ev) = self.pending_events.pop_front() {
323                return Poll::Ready(Some(ev));
324            }
325
326            if self.is_closed {
327                // Terminate the stream if the listener closed and all remaining events have been reported.
328                return Poll::Ready(None);
329            }
330
331            // Try to fill `self.next_event` if necessary and possible. If we fail, then
332            // `Ready(None)` is returned below.
333            if self.next_event.is_none() {
334                if let Ok(ev) = self.iterator.next() {
335                    if !ev.done() {
336                        let promise: js_sys::Promise = ev.value().into();
337                        self.next_event = Some(SendWrapper::new(promise.into()));
338                    }
339                }
340            }
341
342            let event = if let Some(next_event) = self.next_event.as_mut() {
343                let e = match Future::poll(Pin::new(&mut **next_event), cx) {
344                    Poll::Ready(Ok(ev)) => ffi::ListenEvent::from(ev),
345                    Poll::Pending => return Poll::Pending,
346                    Poll::Ready(Err(err)) => {
347                        self.close(Err(err.into()));
348                        continue;
349                    }
350                };
351                self.next_event = None;
352                e
353            } else {
354                self.close(Ok(()));
355                continue;
356            };
357
358            let listener_id = self.listener_id;
359
360            if let Some(addrs) = event.new_addrs() {
361                for addr in addrs.iter() {
362                    match js_value_to_addr(addr) {
363                        Ok(addr) => self.pending_events.push_back(TransportEvent::NewAddress {
364                            listener_id,
365                            listen_addr: addr,
366                        }),
367                        Err(err) => self
368                            .pending_events
369                            .push_back(TransportEvent::ListenerError {
370                                listener_id,
371                                error: err,
372                            }),
373                    };
374                }
375            }
376
377            if let Some(upgrades) = event.new_connections() {
378                for upgrade in upgrades.iter().cloned() {
379                    let upgrade: ffi::ConnectionEvent = upgrade.into();
380                    match upgrade.local_addr().parse().and_then(|local| {
381                        let observed = upgrade.observed_addr().parse()?;
382                        Ok((local, observed))
383                    }) {
384                        Ok((local_addr, send_back_addr)) => {
385                            self.pending_events.push_back(TransportEvent::Incoming {
386                                listener_id,
387                                local_addr,
388                                send_back_addr,
389                                upgrade: futures::future::ok(Connection::new(upgrade.connection())),
390                            })
391                        }
392                        Err(err) => self
393                            .pending_events
394                            .push_back(TransportEvent::ListenerError {
395                                listener_id,
396                                error: err.into(),
397                            }),
398                    }
399                }
400            }
401
402            if let Some(addrs) = event.expired_addrs() {
403                for addr in addrs.iter() {
404                    match js_value_to_addr(addr) {
405                        Ok(addr) => self
406                            .pending_events
407                            .push_back(TransportEvent::AddressExpired {
408                                listener_id,
409                                listen_addr: addr,
410                            }),
411                        Err(err) => self
412                            .pending_events
413                            .push_back(TransportEvent::ListenerError {
414                                listener_id,
415                                error: err,
416                            }),
417                    }
418                }
419            }
420        }
421    }
422}
423
424/// Active stream of data with a remote.
425///
426/// It is guaranteed that each call to `io::Write::write` on this object maps to exactly one call
427/// to `write` on the FFI. In other words, no internal buffering happens for writes, and data can't
428/// be split.
429pub struct Connection {
430    /// The FFI object.
431    inner: SendWrapper<ffi::Connection>,
432
433    /// The iterator that was returned by `read()`.
434    read_iterator: SendWrapper<js_sys::Iterator>,
435
436    /// Reading part of the connection.
437    read_state: ConnectionReadState,
438
439    /// When we write data using the FFI, a promise is returned containing the moment when the
440    /// underlying transport is ready to accept data again. This promise is stored here.
441    /// If this is `Some`, we must wait until the contained promise is resolved to write again.
442    previous_write_promise: Option<SendWrapper<JsFuture>>,
443}
444
445impl Connection {
446    /// Initializes a `Connection` object from the FFI connection.
447    fn new(inner: ffi::Connection) -> Self {
448        let read_iterator = inner.read();
449
450        Connection {
451            inner: SendWrapper::new(inner),
452            read_iterator: SendWrapper::new(read_iterator),
453            read_state: ConnectionReadState::PendingData(Vec::new()),
454            previous_write_promise: None,
455        }
456    }
457}
458
459/// Reading side of the connection.
460enum ConnectionReadState {
461    /// Some data have been read and are waiting to be transferred. Can be empty.
462    PendingData(Vec<u8>),
463    /// Waiting for a `Promise` containing the next data.
464    Waiting(SendWrapper<JsFuture>),
465    /// An error occurred or an earlier read yielded EOF.
466    Finished,
467}
468
469impl fmt::Debug for Connection {
470    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
471        f.debug_tuple("Connection").finish()
472    }
473}
474
475impl AsyncRead for Connection {
476    fn poll_read(
477        mut self: Pin<&mut Self>,
478        cx: &mut Context<'_>,
479        buf: &mut [u8],
480    ) -> Poll<Result<usize, io::Error>> {
481        loop {
482            match mem::replace(&mut self.read_state, ConnectionReadState::Finished) {
483                ConnectionReadState::Finished => {
484                    break Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
485                }
486
487                ConnectionReadState::PendingData(ref data) if data.is_empty() => {
488                    let iter_next = self.read_iterator.next().map_err(JsErr::from)?;
489                    if iter_next.done() {
490                        self.read_state = ConnectionReadState::Finished;
491                    } else {
492                        let promise: js_sys::Promise = iter_next.value().into();
493                        let promise = SendWrapper::new(promise.into());
494                        self.read_state = ConnectionReadState::Waiting(promise);
495                    }
496                    continue;
497                }
498
499                ConnectionReadState::PendingData(mut data) => {
500                    debug_assert!(!data.is_empty());
501                    if buf.len() <= data.len() {
502                        buf.copy_from_slice(&data[..buf.len()]);
503                        self.read_state =
504                            ConnectionReadState::PendingData(data.split_off(buf.len()));
505                        break Poll::Ready(Ok(buf.len()));
506                    } else {
507                        let len = data.len();
508                        buf[..len].copy_from_slice(&data);
509                        self.read_state = ConnectionReadState::PendingData(Vec::new());
510                        break Poll::Ready(Ok(len));
511                    }
512                }
513
514                ConnectionReadState::Waiting(mut promise) => {
515                    let data = match Future::poll(Pin::new(&mut *promise), cx) {
516                        Poll::Ready(Ok(ref data)) if data.is_null() => break Poll::Ready(Ok(0)),
517                        Poll::Ready(Ok(data)) => data,
518                        Poll::Ready(Err(err)) => {
519                            break Poll::Ready(Err(io::Error::from(JsErr::from(err))))
520                        }
521                        Poll::Pending => {
522                            self.read_state = ConnectionReadState::Waiting(promise);
523                            break Poll::Pending;
524                        }
525                    };
526
527                    // Try to directly copy the data into `buf` if it is large enough, otherwise
528                    // transition to `PendingData` and loop again.
529                    let data = js_sys::Uint8Array::new(&data);
530                    let data_len = data.length() as usize;
531                    if data_len <= buf.len() {
532                        data.copy_to(&mut buf[..data_len]);
533                        self.read_state = ConnectionReadState::PendingData(Vec::new());
534                        break Poll::Ready(Ok(data_len));
535                    } else {
536                        let mut tmp_buf = vec![0; data_len];
537                        data.copy_to(&mut tmp_buf[..]);
538                        self.read_state = ConnectionReadState::PendingData(tmp_buf);
539                        continue;
540                    }
541                }
542            }
543        }
544    }
545}
546
547impl AsyncWrite for Connection {
548    fn poll_write(
549        mut self: Pin<&mut Self>,
550        cx: &mut Context<'_>,
551        buf: &[u8],
552    ) -> Poll<Result<usize, io::Error>> {
553        // Note: as explained in the doc-comments of `Connection`, each call to this function must
554        // map to exactly one call to `self.inner.write()`.
555
556        if let Some(mut promise) = self.previous_write_promise.take() {
557            match Future::poll(Pin::new(&mut *promise), cx) {
558                Poll::Ready(Ok(_)) => (),
559                Poll::Ready(Err(err)) => {
560                    return Poll::Ready(Err(io::Error::from(JsErr::from(err))))
561                }
562                Poll::Pending => {
563                    self.previous_write_promise = Some(promise);
564                    return Poll::Pending;
565                }
566            }
567        }
568
569        debug_assert!(self.previous_write_promise.is_none());
570        self.previous_write_promise = Some(SendWrapper::new(
571            self.inner.write(buf).map_err(JsErr::from)?.into(),
572        ));
573        Poll::Ready(Ok(buf.len()))
574    }
575
576    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
577        // There's no flushing mechanism. In the FFI we consider that writing implicitly flushes.
578        Poll::Ready(Ok(()))
579    }
580
581    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
582        // Shutting down is considered instantaneous.
583        match self.inner.shutdown() {
584            Ok(()) => Poll::Ready(Ok(())),
585            Err(err) => Poll::Ready(Err(io::Error::from(JsErr::from(err)))),
586        }
587    }
588}
589
590impl Drop for Connection {
591    fn drop(&mut self) {
592        self.inner.close();
593    }
594}
595
596/// Returns true if `err` is an error about an address not being supported.
597fn is_not_supported_error(err: &JsValue) -> bool {
598    if let Some(err) = err.dyn_ref::<js_sys::Error>() {
599        err.name() == "NotSupportedError"
600    } else {
601        false
602    }
603}
604
605/// Turns a `JsValue` containing a `String` into a `Multiaddr`, if possible.
606fn js_value_to_addr(addr: &JsValue) -> Result<Multiaddr, JsErr> {
607    if let Some(addr) = addr.as_string() {
608        Ok(addr.parse()?)
609    } else {
610        Err(JsValue::from_str("Element in new_addrs is not a string").into())
611    }
612}
613
614/// Error that can be generated by the `ExtTransport`.
615pub struct JsErr(SendWrapper<JsValue>);
616
617impl From<JsValue> for JsErr {
618    fn from(val: JsValue) -> JsErr {
619        JsErr(SendWrapper::new(val))
620    }
621}
622
623impl From<libp2p_core::multiaddr::Error> for JsErr {
624    fn from(err: libp2p_core::multiaddr::Error) -> JsErr {
625        JsValue::from_str(&err.to_string()).into()
626    }
627}
628
629impl From<JsErr> for io::Error {
630    fn from(err: JsErr) -> io::Error {
631        io::Error::new(io::ErrorKind::Other, err.to_string())
632    }
633}
634
635impl fmt::Debug for JsErr {
636    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
637        write!(f, "{self}")
638    }
639}
640
641impl fmt::Display for JsErr {
642    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
643        if let Some(s) = self.0.as_string() {
644            write!(f, "{s}")
645        } else if let Some(err) = self.0.dyn_ref::<js_sys::Error>() {
646            write!(f, "{}", String::from(err.message()))
647        } else if let Some(obj) = self.0.dyn_ref::<js_sys::Object>() {
648            write!(f, "{}", String::from(obj.to_string()))
649        } else {
650            write!(f, "{:?}", &*self.0)
651        }
652    }
653}
654
655impl error::Error for JsErr {}