1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
24
25mod codec;
26mod config;
27mod io;
28
29use std::{
30 cmp, iter,
31 pin::Pin,
32 sync::Arc,
33 task::{Context, Poll},
34};
35
36use bytes::Bytes;
37use codec::LocalStreamId;
38pub use config::{MaxBufferBehaviour, MplexConfig};
39use futures::{prelude::*, ready};
40use libp2p_core::{
41 muxing::{StreamMuxer, StreamMuxerEvent},
42 upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade, UpgradeInfo},
43};
44use parking_lot::Mutex;
45
46impl UpgradeInfo for MplexConfig {
47 type Info = &'static str;
48 type InfoIter = iter::Once<Self::Info>;
49
50 fn protocol_info(&self) -> Self::InfoIter {
51 iter::once(self.protocol_name)
52 }
53}
54
55impl<C> InboundConnectionUpgrade<C> for MplexConfig
56where
57 C: AsyncRead + AsyncWrite + Unpin,
58{
59 type Output = Multiplex<C>;
60 type Error = io::Error;
61 type Future = future::Ready<Result<Self::Output, io::Error>>;
62
63 fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
64 future::ready(Ok(Multiplex {
65 #[allow(unknown_lints, clippy::arc_with_non_send_sync)] io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))),
67 }))
68 }
69}
70
71impl<C> OutboundConnectionUpgrade<C> for MplexConfig
72where
73 C: AsyncRead + AsyncWrite + Unpin,
74{
75 type Output = Multiplex<C>;
76 type Error = io::Error;
77 type Future = future::Ready<Result<Self::Output, io::Error>>;
78
79 fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
80 future::ready(Ok(Multiplex {
81 #[allow(unknown_lints, clippy::arc_with_non_send_sync)] io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))),
83 }))
84 }
85}
86
87pub struct Multiplex<C> {
89 io: Arc<Mutex<io::Multiplexed<C>>>,
90}
91
92impl<C> StreamMuxer for Multiplex<C>
93where
94 C: AsyncRead + AsyncWrite + Unpin,
95{
96 type Substream = Substream<C>;
97 type Error = io::Error;
98
99 fn poll_inbound(
100 self: Pin<&mut Self>,
101 cx: &mut Context<'_>,
102 ) -> Poll<Result<Self::Substream, Self::Error>> {
103 self.io
104 .lock()
105 .poll_next_stream(cx)
106 .map_ok(|stream_id| Substream::new(stream_id, self.io.clone()))
107 }
108
109 fn poll_outbound(
110 self: Pin<&mut Self>,
111 cx: &mut Context<'_>,
112 ) -> Poll<Result<Self::Substream, Self::Error>> {
113 self.io
114 .lock()
115 .poll_open_stream(cx)
116 .map_ok(|stream_id| Substream::new(stream_id, self.io.clone()))
117 }
118
119 fn poll(
120 self: Pin<&mut Self>,
121 _: &mut Context<'_>,
122 ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
123 Poll::Pending
124 }
125
126 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
127 self.io.lock().poll_close(cx)
128 }
129}
130
131impl<C> AsyncRead for Substream<C>
132where
133 C: AsyncRead + AsyncWrite + Unpin,
134{
135 fn poll_read(
136 self: Pin<&mut Self>,
137 cx: &mut Context<'_>,
138 buf: &mut [u8],
139 ) -> Poll<io::Result<usize>> {
140 let this = self.get_mut();
141
142 loop {
143 if !this.current_data.is_empty() {
145 let len = cmp::min(this.current_data.len(), buf.len());
146 buf[..len].copy_from_slice(&this.current_data.split_to(len));
147 return Poll::Ready(Ok(len));
148 }
149
150 match ready!(this.io.lock().poll_read_stream(cx, this.id))? {
152 Some(data) => {
153 this.current_data = data;
154 }
155 None => return Poll::Ready(Ok(0)),
156 }
157 }
158 }
159}
160
161impl<C> AsyncWrite for Substream<C>
162where
163 C: AsyncRead + AsyncWrite + Unpin,
164{
165 fn poll_write(
166 self: Pin<&mut Self>,
167 cx: &mut Context<'_>,
168 buf: &[u8],
169 ) -> Poll<io::Result<usize>> {
170 let this = self.get_mut();
171
172 this.io.lock().poll_write_stream(cx, this.id, buf)
173 }
174
175 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
176 let this = self.get_mut();
177
178 this.io.lock().poll_flush_stream(cx, this.id)
179 }
180
181 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
182 let this = self.get_mut();
183 let mut io = this.io.lock();
184
185 ready!(io.poll_close_stream(cx, this.id))?;
186 ready!(io.poll_flush_stream(cx, this.id))?;
187
188 Poll::Ready(Ok(()))
189 }
190}
191
192pub struct Substream<C>
194where
195 C: AsyncRead + AsyncWrite + Unpin,
196{
197 id: LocalStreamId,
199 current_data: Bytes,
201 io: Arc<Mutex<io::Multiplexed<C>>>,
203}
204
205impl<C> Substream<C>
206where
207 C: AsyncRead + AsyncWrite + Unpin,
208{
209 fn new(id: LocalStreamId, io: Arc<Mutex<io::Multiplexed<C>>>) -> Self {
210 Self {
211 id,
212 current_data: Bytes::new(),
213 io,
214 }
215 }
216}
217
218impl<C> Drop for Substream<C>
219where
220 C: AsyncRead + AsyncWrite + Unpin,
221{
222 fn drop(&mut self) {
223 self.io.lock().drop_stream(self.id);
224 }
225}