libp2p_mplex/
lib.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the Stream Multiplexer [Mplex](https://github.com/libp2p/specs/blob/master/mplex/README.md) protocol.
22
23#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
24
25mod codec;
26mod config;
27mod io;
28
29use std::{
30    cmp, iter,
31    pin::Pin,
32    sync::Arc,
33    task::{Context, Poll},
34};
35
36use bytes::Bytes;
37use codec::LocalStreamId;
38pub use config::{MaxBufferBehaviour, MplexConfig};
39use futures::{prelude::*, ready};
40use libp2p_core::{
41    muxing::{StreamMuxer, StreamMuxerEvent},
42    upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade, UpgradeInfo},
43};
44use parking_lot::Mutex;
45
46impl UpgradeInfo for MplexConfig {
47    type Info = &'static str;
48    type InfoIter = iter::Once<Self::Info>;
49
50    fn protocol_info(&self) -> Self::InfoIter {
51        iter::once(self.protocol_name)
52    }
53}
54
55impl<C> InboundConnectionUpgrade<C> for MplexConfig
56where
57    C: AsyncRead + AsyncWrite + Unpin,
58{
59    type Output = Multiplex<C>;
60    type Error = io::Error;
61    type Future = future::Ready<Result<Self::Output, io::Error>>;
62
63    fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
64        future::ready(Ok(Multiplex {
65            #[allow(unknown_lints, clippy::arc_with_non_send_sync)] // `T` is not enforced to be `Send` but we don't want to constrain it either.
66            io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))),
67        }))
68    }
69}
70
71impl<C> OutboundConnectionUpgrade<C> for MplexConfig
72where
73    C: AsyncRead + AsyncWrite + Unpin,
74{
75    type Output = Multiplex<C>;
76    type Error = io::Error;
77    type Future = future::Ready<Result<Self::Output, io::Error>>;
78
79    fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
80        future::ready(Ok(Multiplex {
81            #[allow(unknown_lints, clippy::arc_with_non_send_sync)] // `T` is not enforced to be `Send` but we don't want to constrain it either.
82            io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))),
83        }))
84    }
85}
86
87/// Multiplexer. Implements the `StreamMuxer` trait.
88pub struct Multiplex<C> {
89    io: Arc<Mutex<io::Multiplexed<C>>>,
90}
91
92impl<C> StreamMuxer for Multiplex<C>
93where
94    C: AsyncRead + AsyncWrite + Unpin,
95{
96    type Substream = Substream<C>;
97    type Error = io::Error;
98
99    fn poll_inbound(
100        self: Pin<&mut Self>,
101        cx: &mut Context<'_>,
102    ) -> Poll<Result<Self::Substream, Self::Error>> {
103        self.io
104            .lock()
105            .poll_next_stream(cx)
106            .map_ok(|stream_id| Substream::new(stream_id, self.io.clone()))
107    }
108
109    fn poll_outbound(
110        self: Pin<&mut Self>,
111        cx: &mut Context<'_>,
112    ) -> Poll<Result<Self::Substream, Self::Error>> {
113        self.io
114            .lock()
115            .poll_open_stream(cx)
116            .map_ok(|stream_id| Substream::new(stream_id, self.io.clone()))
117    }
118
119    fn poll(
120        self: Pin<&mut Self>,
121        _: &mut Context<'_>,
122    ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
123        Poll::Pending
124    }
125
126    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
127        self.io.lock().poll_close(cx)
128    }
129}
130
131impl<C> AsyncRead for Substream<C>
132where
133    C: AsyncRead + AsyncWrite + Unpin,
134{
135    fn poll_read(
136        self: Pin<&mut Self>,
137        cx: &mut Context<'_>,
138        buf: &mut [u8],
139    ) -> Poll<io::Result<usize>> {
140        let this = self.get_mut();
141
142        loop {
143            // Try to read from the current (i.e. last received) frame.
144            if !this.current_data.is_empty() {
145                let len = cmp::min(this.current_data.len(), buf.len());
146                buf[..len].copy_from_slice(&this.current_data.split_to(len));
147                return Poll::Ready(Ok(len));
148            }
149
150            // Read the next data frame from the multiplexed stream.
151            match ready!(this.io.lock().poll_read_stream(cx, this.id))? {
152                Some(data) => {
153                    this.current_data = data;
154                }
155                None => return Poll::Ready(Ok(0)),
156            }
157        }
158    }
159}
160
161impl<C> AsyncWrite for Substream<C>
162where
163    C: AsyncRead + AsyncWrite + Unpin,
164{
165    fn poll_write(
166        self: Pin<&mut Self>,
167        cx: &mut Context<'_>,
168        buf: &[u8],
169    ) -> Poll<io::Result<usize>> {
170        let this = self.get_mut();
171
172        this.io.lock().poll_write_stream(cx, this.id, buf)
173    }
174
175    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
176        let this = self.get_mut();
177
178        this.io.lock().poll_flush_stream(cx, this.id)
179    }
180
181    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
182        let this = self.get_mut();
183        let mut io = this.io.lock();
184
185        ready!(io.poll_close_stream(cx, this.id))?;
186        ready!(io.poll_flush_stream(cx, this.id))?;
187
188        Poll::Ready(Ok(()))
189    }
190}
191
192/// Active substream to the remote.
193pub struct Substream<C>
194where
195    C: AsyncRead + AsyncWrite + Unpin,
196{
197    /// The unique, local identifier of the substream.
198    id: LocalStreamId,
199    /// The current data frame the substream is reading from.
200    current_data: Bytes,
201    /// Shared reference to the actual muxer.
202    io: Arc<Mutex<io::Multiplexed<C>>>,
203}
204
205impl<C> Substream<C>
206where
207    C: AsyncRead + AsyncWrite + Unpin,
208{
209    fn new(id: LocalStreamId, io: Arc<Mutex<io::Multiplexed<C>>>) -> Self {
210        Self {
211            id,
212            current_data: Bytes::new(),
213            io,
214        }
215    }
216}
217
218impl<C> Drop for Substream<C>
219where
220    C: AsyncRead + AsyncWrite + Unpin,
221{
222    fn drop(&mut self) {
223        self.io.lock().drop_stream(self.id);
224    }
225}