libp2p_websocket/lib.rs
1// Copyright 2017-2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the libp2p `Transport` trait for Websockets.
22
23#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
24
25pub mod error;
26pub mod framed;
27mod quicksink;
28pub mod tls;
29
30use std::{
31 io,
32 pin::Pin,
33 task::{Context, Poll},
34};
35
36use error::Error;
37use framed::{Connection, Incoming};
38use futures::{future::BoxFuture, prelude::*, ready};
39use libp2p_core::{
40 connection::ConnectedPoint,
41 multiaddr::Multiaddr,
42 transport::{map::MapFuture, DialOpts, ListenerId, TransportError, TransportEvent},
43 Transport,
44};
45use rw_stream_sink::RwStreamSink;
46
47/// A Websocket transport.
48///
49/// DO NOT wrap this transport with a DNS transport if you want Secure Websockets to work.
50///
51/// A Secure Websocket transport needs to wrap DNS transport to resolve domain names after
52/// they are checked against the remote certificates. Use a combination of DNS and TCP transports
53/// to build a Secure Websocket transport.
54///
55/// If you don't need Secure Websocket's support, use a plain TCP transport as an inner transport.
56///
57/// # Dependencies
58///
59/// This transport requires the `zlib` shared library to be installed on the system.
60///
61/// Future releases might lift this requirement, see <https://github.com/paritytech/soketto/issues/72>.
62///
63/// # Examples
64///
65/// Secure Websocket transport:
66///
67/// ```
68/// # use futures::future;
69/// # use libp2p_core::{transport::ListenerId, Transport};
70/// # use libp2p_dns as dns;
71/// # use libp2p_tcp as tcp;
72/// # use libp2p_websocket as websocket;
73/// # use rcgen::generate_simple_self_signed;
74/// # use std::pin::Pin;
75/// #
76/// # #[async_std::main]
77/// # async fn main() {
78///
79/// let mut transport = websocket::WsConfig::new(
80/// dns::async_std::Transport::system(tcp::async_io::Transport::new(tcp::Config::default()))
81/// .await
82/// .unwrap(),
83/// );
84///
85/// let rcgen_cert = generate_simple_self_signed(vec!["localhost".to_string()]).unwrap();
86/// let priv_key = websocket::tls::PrivateKey::new(rcgen_cert.serialize_private_key_der());
87/// let cert = websocket::tls::Certificate::new(rcgen_cert.serialize_der().unwrap());
88/// transport.set_tls_config(websocket::tls::Config::new(priv_key, vec![cert]).unwrap());
89///
90/// let id = transport
91/// .listen_on(
92/// ListenerId::next(),
93/// "/ip4/127.0.0.1/tcp/0/tls/ws".parse().unwrap(),
94/// )
95/// .unwrap();
96///
97/// let addr = future::poll_fn(|cx| Pin::new(&mut transport).poll(cx))
98/// .await
99/// .into_new_address()
100/// .unwrap();
101/// println!("Listening on {addr}");
102///
103/// # }
104/// ```
105///
106/// Plain Websocket transport:
107///
108/// ```
109/// # use futures::future;
110/// # use libp2p_core::{transport::ListenerId, Transport};
111/// # use libp2p_dns as dns;
112/// # use libp2p_tcp as tcp;
113/// # use libp2p_websocket as websocket;
114/// # use std::pin::Pin;
115/// #
116/// # #[async_std::main]
117/// # async fn main() {
118///
119/// let mut transport =
120/// websocket::WsConfig::new(tcp::async_io::Transport::new(tcp::Config::default()));
121///
122/// let id = transport
123/// .listen_on(
124/// ListenerId::next(),
125/// "/ip4/127.0.0.1/tcp/0/ws".parse().unwrap(),
126/// )
127/// .unwrap();
128///
129/// let addr = future::poll_fn(|cx| Pin::new(&mut transport).poll(cx))
130/// .await
131/// .into_new_address()
132/// .unwrap();
133/// println!("Listening on {addr}");
134///
135/// # }
136/// ```
137#[derive(Debug)]
138pub struct WsConfig<T: Transport>
139where
140 T: Transport,
141 T::Output: AsyncRead + AsyncWrite + Send + Unpin + 'static,
142{
143 transport: libp2p_core::transport::map::Map<framed::WsConfig<T>, WrapperFn<T::Output>>,
144}
145
146impl<T: Transport> WsConfig<T>
147where
148 T: Transport + Send + Unpin + 'static,
149 T::Error: Send + 'static,
150 T::Dial: Send + 'static,
151 T::ListenerUpgrade: Send + 'static,
152 T::Output: AsyncRead + AsyncWrite + Send + Unpin + 'static,
153{
154 /// Create a new websocket transport based on the given transport.
155 ///
156 /// > **Note*: The given transport must be based on TCP/IP and should
157 /// > usually incorporate DNS resolution, though the latter is not
158 /// > strictly necessary if one wishes to only use the `Ws` protocol
159 /// > with known IP addresses and ports. See [`libp2p-tcp`](https://docs.rs/libp2p-tcp/)
160 /// > and [`libp2p-dns`](https://docs.rs/libp2p-dns) for constructing
161 /// > the inner transport.
162 pub fn new(transport: T) -> Self {
163 Self {
164 transport: framed::WsConfig::new(transport)
165 .map(wrap_connection as WrapperFn<T::Output>),
166 }
167 }
168
169 /// Return the configured maximum number of redirects.
170 pub fn max_redirects(&self) -> u8 {
171 self.transport.inner().max_redirects()
172 }
173
174 /// Set max. number of redirects to follow.
175 pub fn set_max_redirects(&mut self, max: u8) -> &mut Self {
176 self.transport.inner_mut().set_max_redirects(max);
177 self
178 }
179
180 /// Get the max. frame data size we support.
181 pub fn max_data_size(&self) -> usize {
182 self.transport.inner().max_data_size()
183 }
184
185 /// Set the max. frame data size we support.
186 pub fn set_max_data_size(&mut self, size: usize) -> &mut Self {
187 self.transport.inner_mut().set_max_data_size(size);
188 self
189 }
190
191 /// Set the TLS configuration if TLS support is desired.
192 pub fn set_tls_config(&mut self, c: tls::Config) -> &mut Self {
193 self.transport.inner_mut().set_tls_config(c);
194 self
195 }
196}
197
198impl<T> Transport for WsConfig<T>
199where
200 T: Transport + Send + Unpin + 'static,
201 T::Error: Send + 'static,
202 T::Dial: Send + 'static,
203 T::ListenerUpgrade: Send + 'static,
204 T::Output: AsyncRead + AsyncWrite + Unpin + Send + 'static,
205{
206 type Output = RwStreamSink<BytesConnection<T::Output>>;
207 type Error = Error<T::Error>;
208 type ListenerUpgrade = MapFuture<InnerFuture<T::Output, T::Error>, WrapperFn<T::Output>>;
209 type Dial = MapFuture<InnerFuture<T::Output, T::Error>, WrapperFn<T::Output>>;
210
211 fn listen_on(
212 &mut self,
213 id: ListenerId,
214 addr: Multiaddr,
215 ) -> Result<(), TransportError<Self::Error>> {
216 self.transport.listen_on(id, addr)
217 }
218
219 fn remove_listener(&mut self, id: ListenerId) -> bool {
220 self.transport.remove_listener(id)
221 }
222
223 fn dial(
224 &mut self,
225 addr: Multiaddr,
226 opts: DialOpts,
227 ) -> Result<Self::Dial, TransportError<Self::Error>> {
228 self.transport.dial(addr, opts)
229 }
230
231 fn poll(
232 mut self: Pin<&mut Self>,
233 cx: &mut Context<'_>,
234 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
235 Pin::new(&mut self.transport).poll(cx)
236 }
237}
238
239/// Type alias corresponding to `framed::WsConfig::Dial` and `framed::WsConfig::ListenerUpgrade`.
240pub type InnerFuture<T, E> = BoxFuture<'static, Result<Connection<T>, Error<E>>>;
241
242/// Function type that wraps a websocket connection (see. `wrap_connection`).
243pub type WrapperFn<T> = fn(Connection<T>, ConnectedPoint) -> RwStreamSink<BytesConnection<T>>;
244
245/// Wrap a websocket connection producing data frames into a `RwStreamSink`
246/// implementing `AsyncRead` + `AsyncWrite`.
247fn wrap_connection<T>(c: Connection<T>, _: ConnectedPoint) -> RwStreamSink<BytesConnection<T>>
248where
249 T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
250{
251 RwStreamSink::new(BytesConnection(c))
252}
253
254/// The websocket connection.
255#[derive(Debug)]
256pub struct BytesConnection<T>(Connection<T>);
257
258impl<T> Stream for BytesConnection<T>
259where
260 T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
261{
262 type Item = io::Result<Vec<u8>>;
263
264 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
265 loop {
266 if let Some(item) = ready!(self.0.try_poll_next_unpin(cx)?) {
267 if let Incoming::Data(payload) = item {
268 return Poll::Ready(Some(Ok(payload.into_bytes())));
269 }
270 } else {
271 return Poll::Ready(None);
272 }
273 }
274 }
275}
276
277impl<T> Sink<Vec<u8>> for BytesConnection<T>
278where
279 T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
280{
281 type Error = io::Error;
282
283 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
284 Pin::new(&mut self.0).poll_ready(cx)
285 }
286
287 fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> io::Result<()> {
288 Pin::new(&mut self.0).start_send(framed::OutgoingData::Binary(item))
289 }
290
291 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
292 Pin::new(&mut self.0).poll_flush(cx)
293 }
294
295 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
296 Pin::new(&mut self.0).poll_close(cx)
297 }
298}
299
300// Tests //////////////////////////////////////////////////////////////////////////////////////////
301
302#[cfg(test)]
303mod tests {
304 use futures::prelude::*;
305 use libp2p_core::{
306 multiaddr::Protocol,
307 transport::{DialOpts, ListenerId, PortUse},
308 Endpoint, Multiaddr, Transport,
309 };
310 use libp2p_identity::PeerId;
311 use libp2p_tcp as tcp;
312
313 use super::WsConfig;
314
315 #[test]
316 fn dialer_connects_to_listener_ipv4() {
317 let a = "/ip4/127.0.0.1/tcp/0/ws".parse().unwrap();
318 futures::executor::block_on(connect(a))
319 }
320
321 #[test]
322 fn dialer_connects_to_listener_ipv6() {
323 let a = "/ip6/::1/tcp/0/ws".parse().unwrap();
324 futures::executor::block_on(connect(a))
325 }
326
327 fn new_ws_config() -> WsConfig<tcp::async_io::Transport> {
328 WsConfig::new(tcp::async_io::Transport::new(tcp::Config::default()))
329 }
330
331 async fn connect(listen_addr: Multiaddr) {
332 let mut ws_config = new_ws_config().boxed();
333 ws_config
334 .listen_on(ListenerId::next(), listen_addr)
335 .expect("listener");
336
337 let addr = ws_config
338 .next()
339 .await
340 .expect("no error")
341 .into_new_address()
342 .expect("listen address");
343
344 assert_eq!(Some(Protocol::Ws("/".into())), addr.iter().nth(2));
345 assert_ne!(Some(Protocol::Tcp(0)), addr.iter().nth(1));
346
347 let inbound = async move {
348 let (conn, _addr) = ws_config
349 .select_next_some()
350 .map(|ev| ev.into_incoming())
351 .await
352 .unwrap();
353 conn.await
354 };
355
356 let outbound = new_ws_config()
357 .boxed()
358 .dial(
359 addr.with(Protocol::P2p(PeerId::random())),
360 DialOpts {
361 role: Endpoint::Dialer,
362 port_use: PortUse::New,
363 },
364 )
365 .unwrap();
366
367 let (a, b) = futures::join!(inbound, outbound);
368 a.and(b).unwrap();
369 }
370}