mod codec;
use std::{cmp, iter, mem};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc};
use bytes::Bytes;
use libp2p_core::{
Endpoint,
StreamMuxer,
muxing::Shutdown,
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}
};
use log::{debug, trace};
use parking_lot::Mutex;
use fnv::{FnvHashMap, FnvHashSet};
use futures::{prelude::*, executor, future, stream::Fuse, task, task_local, try_ready};
use tokio_codec::Framed;
use tokio_io::{AsyncRead, AsyncWrite};
#[derive(Debug, Clone)]
pub struct MplexConfig {
max_substreams: usize,
max_buffer_len: usize,
max_buffer_behaviour: MaxBufferBehaviour,
split_send_size: usize,
}
impl MplexConfig {
#[inline]
pub fn new() -> MplexConfig {
Default::default()
}
#[inline]
pub fn max_substreams(&mut self, max: usize) -> &mut Self {
self.max_substreams = max;
self
}
#[inline]
pub fn max_buffer_len(&mut self, max: usize) -> &mut Self {
self.max_buffer_len = max;
self
}
#[inline]
pub fn max_buffer_len_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self {
self.max_buffer_behaviour = behaviour;
self
}
pub fn split_send_size(&mut self, size: usize) -> &mut Self {
let size = cmp::min(size, codec::MAX_FRAME_SIZE);
self.split_send_size = size;
self
}
#[inline]
fn upgrade<C>(self, i: C) -> Multiplex<C>
where
C: AsyncRead + AsyncWrite
{
let max_buffer_len = self.max_buffer_len;
Multiplex {
inner: Mutex::new(MultiplexInner {
error: Ok(()),
inner: executor::spawn(Framed::new(i, codec::Codec::new()).fuse()),
config: self,
buffer: Vec::with_capacity(cmp::min(max_buffer_len, 512)),
opened_substreams: Default::default(),
next_outbound_stream_id: 0,
notifier_read: Arc::new(Notifier {
to_notify: Mutex::new(Default::default()),
}),
notifier_write: Arc::new(Notifier {
to_notify: Mutex::new(Default::default()),
}),
is_shutdown: false,
is_acknowledged: false,
})
}
}
}
impl Default for MplexConfig {
#[inline]
fn default() -> MplexConfig {
MplexConfig {
max_substreams: 128,
max_buffer_len: 4096,
max_buffer_behaviour: MaxBufferBehaviour::CloseAll,
split_send_size: 1024,
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum MaxBufferBehaviour {
CloseAll,
Block,
}
impl UpgradeInfo for MplexConfig {
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;
#[inline]
fn protocol_info(&self) -> Self::InfoIter {
iter::once(b"/mplex/6.7.0")
}
}
impl<C> InboundUpgrade<C> for MplexConfig
where
C: AsyncRead + AsyncWrite,
{
type Output = Multiplex<C>;
type Error = IoError;
type Future = future::FutureResult<Self::Output, IoError>;
fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
future::ok(self.upgrade(socket))
}
}
impl<C> OutboundUpgrade<C> for MplexConfig
where
C: AsyncRead + AsyncWrite,
{
type Output = Multiplex<C>;
type Error = IoError;
type Future = future::FutureResult<Self::Output, IoError>;
fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
future::ok(self.upgrade(socket))
}
}
pub struct Multiplex<C> {
inner: Mutex<MultiplexInner<C>>,
}
struct MultiplexInner<C> {
error: Result<(), IoError>,
inner: executor::Spawn<Fuse<Framed<C, codec::Codec>>>,
config: MplexConfig,
buffer: Vec<codec::Elem>,
opened_substreams: FnvHashSet<(u32, Endpoint)>,
next_outbound_stream_id: u32,
notifier_read: Arc<Notifier>,
notifier_write: Arc<Notifier>,
is_shutdown: bool,
is_acknowledged: bool,
}
struct Notifier {
to_notify: Mutex<FnvHashMap<usize, task::Task>>,
}
impl executor::Notify for Notifier {
fn notify(&self, _: usize) {
let tasks = mem::replace(&mut *self.to_notify.lock(), Default::default());
for (_, task) in tasks {
task.notify();
}
}
}
static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0);
task_local!{
static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed)
}
fn next_match<C, F, O>(inner: &mut MultiplexInner<C>, mut filter: F) -> Poll<Option<O>, IoError>
where C: AsyncRead + AsyncWrite,
F: FnMut(&codec::Elem) -> Option<O>,
{
if let Err(ref err) = inner.error {
return Err(IoError::new(err.kind(), err.to_string()));
}
if let Some((offset, out)) = inner.buffer.iter().enumerate().filter_map(|(n, v)| filter(v).map(|v| (n, v))).next() {
if inner.buffer.len() == inner.config.max_buffer_len {
executor::Notify::notify(&*inner.notifier_read, 0);
}
inner.buffer.remove(offset);
return Ok(Async::Ready(Some(out)));
}
loop {
debug_assert!(inner.buffer.len() <= inner.config.max_buffer_len);
if inner.buffer.len() == inner.config.max_buffer_len {
debug!("Reached mplex maximum buffer length");
match inner.config.max_buffer_behaviour {
MaxBufferBehaviour::CloseAll => {
inner.error = Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length"));
return Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length"));
},
MaxBufferBehaviour::Block => {
inner.notifier_read.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current());
return Ok(Async::NotReady);
},
}
}
let elem = match inner.inner.poll_stream_notify(&inner.notifier_read, 0) {
Ok(Async::Ready(Some(item))) => item,
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
Ok(Async::NotReady) => {
inner.notifier_read.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current());
return Ok(Async::NotReady);
},
Err(err) => {
let err2 = IoError::new(err.kind(), err.to_string());
inner.error = Err(err);
return Err(err2);
},
};
trace!("Received message: {:?}", elem);
inner.is_acknowledged = true;
match elem {
codec::Elem::Open { substream_id } => {
if !inner.opened_substreams.insert((substream_id, Endpoint::Listener)) {
debug!("Received open message for substream {} which was already open", substream_id)
}
}
codec::Elem::Close { substream_id, endpoint, .. } | codec::Elem::Reset { substream_id, endpoint, .. } => {
inner.opened_substreams.remove(&(substream_id, !endpoint));
}
_ => ()
}
if let Some(out) = filter(&elem) {
return Ok(Async::Ready(Some(out)));
} else {
let endpoint = elem.endpoint().unwrap_or(Endpoint::Dialer);
if inner.opened_substreams.contains(&(elem.substream_id(), !endpoint)) || elem.is_open_msg() {
inner.buffer.push(elem);
} else if !elem.is_close_or_reset_msg() {
debug!("Ignored message {:?} because the substream wasn't open", elem);
}
}
}
}
fn poll_send<C>(inner: &mut MultiplexInner<C>, elem: codec::Elem) -> Poll<(), IoError>
where C: AsyncRead + AsyncWrite
{
if inner.is_shutdown {
return Err(IoError::new(IoErrorKind::Other, "connection is shut down"))
}
match inner.inner.start_send_notify(elem, &inner.notifier_write, 0) {
Ok(AsyncSink::Ready) => {
Ok(Async::Ready(()))
},
Ok(AsyncSink::NotReady(_)) => {
inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current());
Ok(Async::NotReady)
},
Err(err) => Err(err)
}
}
impl<C> StreamMuxer for Multiplex<C>
where C: AsyncRead + AsyncWrite
{
type Substream = Substream;
type OutboundSubstream = OutboundSubstream;
fn poll_inbound(&self) -> Poll<Option<Self::Substream>, IoError> {
let mut inner = self.inner.lock();
if inner.opened_substreams.len() >= inner.config.max_substreams {
debug!("Refused substream; reached maximum number of substreams {}", inner.config.max_substreams);
return Err(IoError::new(IoErrorKind::ConnectionRefused,
"exceeded maximum number of open substreams"));
}
let num = try_ready!(next_match(&mut inner, |elem| {
match elem {
codec::Elem::Open { substream_id } => Some(*substream_id),
_ => None,
}
}));
if let Some(num) = num {
debug!("Successfully opened inbound substream {}", num);
Ok(Async::Ready(Some(Substream {
current_data: Bytes::new(),
num,
endpoint: Endpoint::Listener,
})))
} else {
Ok(Async::Ready(None))
}
}
fn open_outbound(&self) -> Self::OutboundSubstream {
let mut inner = self.inner.lock();
let substream_id = {
let n = inner.next_outbound_stream_id;
inner.next_outbound_stream_id = inner.next_outbound_stream_id.checked_add(1)
.expect("Mplex substream ID overflowed");
n
};
inner.opened_substreams.insert((substream_id, Endpoint::Dialer));
OutboundSubstream {
num: substream_id,
state: OutboundSubstreamState::SendElem(codec::Elem::Open { substream_id }),
}
}
fn poll_outbound(&self, substream: &mut Self::OutboundSubstream) -> Poll<Option<Self::Substream>, IoError> {
loop {
let mut inner = self.inner.lock();
let polling = match substream.state {
OutboundSubstreamState::SendElem(ref elem) => {
poll_send(&mut inner, elem.clone())
},
OutboundSubstreamState::Flush => {
if inner.is_shutdown {
return Err(IoError::new(IoErrorKind::Other, "connection is shut down"))
}
let inner = &mut *inner;
inner.inner.poll_flush_notify(&inner.notifier_write, 0)
},
OutboundSubstreamState::Done => {
panic!("Polling outbound substream after it's been succesfully open");
},
};
match polling {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => {
inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current());
return Ok(Async::NotReady)
},
Err(err) => {
debug!("Failed to open outbound substream {}", substream.num);
inner.buffer.retain(|elem| {
elem.substream_id() != substream.num || elem.endpoint() == Some(Endpoint::Dialer)
});
return Err(err)
},
};
drop(inner);
match substream.state {
OutboundSubstreamState::SendElem(_) => {
substream.state = OutboundSubstreamState::Flush;
},
OutboundSubstreamState::Flush => {
debug!("Successfully opened outbound substream {}", substream.num);
substream.state = OutboundSubstreamState::Done;
return Ok(Async::Ready(Some(Substream {
num: substream.num,
current_data: Bytes::new(),
endpoint: Endpoint::Dialer,
})));
},
OutboundSubstreamState::Done => unreachable!(),
}
}
}
#[inline]
fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
}
fn read_substream(&self, substream: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError> {
loop {
if !substream.current_data.is_empty() {
let len = cmp::min(substream.current_data.len(), buf.len());
buf[..len].copy_from_slice(&substream.current_data.split_to(len));
return Ok(Async::Ready(len));
}
let mut inner = self.inner.lock();
let next_data_poll = next_match(&mut inner, |elem| {
match elem {
codec::Elem::Data { substream_id, endpoint, data, .. }
if *substream_id == substream.num && *endpoint != substream.endpoint =>
{
Some(data.clone())
}
_ => None
}
});
match next_data_poll {
Ok(Async::Ready(Some(data))) => substream.current_data = data,
Ok(Async::Ready(None)) => return Ok(Async::Ready(0)),
Ok(Async::NotReady) => {
if inner.opened_substreams.contains(&(substream.num, substream.endpoint)) {
return Ok(Async::NotReady)
} else {
return Ok(Async::Ready(0))
}
},
Err(err) => return Err(err)
}
}
}
fn write_substream(&self, substream: &mut Self::Substream, buf: &[u8]) -> Poll<usize, IoError> {
let mut inner = self.inner.lock();
let to_write = cmp::min(buf.len(), inner.config.split_send_size);
let elem = codec::Elem::Data {
substream_id: substream.num,
data: From::from(&buf[..to_write]),
endpoint: substream.endpoint,
};
match poll_send(&mut inner, elem)? {
Async::Ready(()) => Ok(Async::Ready(to_write)),
Async::NotReady => Ok(Async::NotReady)
}
}
fn flush_substream(&self, _substream: &mut Self::Substream) -> Poll<(), IoError> {
let mut inner = self.inner.lock();
if inner.is_shutdown {
return Err(IoError::new(IoErrorKind::Other, "connection is shut down"))
}
let inner = &mut *inner;
match inner.inner.poll_flush_notify(&inner.notifier_write, 0)? {
Async::Ready(()) => Ok(Async::Ready(())),
Async::NotReady => {
inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current());
Ok(Async::NotReady)
}
}
}
fn shutdown_substream(&self, sub: &mut Self::Substream, _: Shutdown) -> Poll<(), IoError> {
let elem = codec::Elem::Close {
substream_id: sub.num,
endpoint: sub.endpoint,
};
let mut inner = self.inner.lock();
poll_send(&mut inner, elem)
}
fn destroy_substream(&self, sub: Self::Substream) {
self.inner.lock().buffer.retain(|elem| {
elem.substream_id() != sub.num || elem.endpoint() == Some(sub.endpoint)
})
}
fn is_remote_acknowledged(&self) -> bool {
self.inner.lock().is_acknowledged
}
#[inline]
fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> {
let inner = &mut *self.inner.lock();
try_ready!(inner.inner.close_notify(&inner.notifier_write, 0));
inner.is_shutdown = true;
Ok(Async::Ready(()))
}
#[inline]
fn flush_all(&self) -> Poll<(), IoError> {
let inner = &mut *self.inner.lock();
if inner.is_shutdown {
return Ok(Async::Ready(()))
}
inner.inner.poll_flush_notify(&inner.notifier_write, 0)
}
}
pub struct OutboundSubstream {
num: u32,
state: OutboundSubstreamState,
}
enum OutboundSubstreamState {
SendElem(codec::Elem),
Flush,
Done,
}
pub struct Substream {
num: u32,
current_data: Bytes,
endpoint: Endpoint,
}