pub use crate::upgrade::Version;
use crate::{
ConnectedPoint,
Negotiated,
transport::{
Transport,
TransportError,
ListenerEvent,
and_then::AndThen,
boxed::boxed,
timeout::TransportTimeout,
},
muxing::{StreamMuxer, StreamMuxerBox},
upgrade::{
self,
OutboundUpgrade,
InboundUpgrade,
apply_inbound,
apply_outbound,
UpgradeError,
OutboundUpgradeApply,
InboundUpgradeApply
},
PeerId
};
use futures::{prelude::*, ready};
use multiaddr::Multiaddr;
use std::{
error::Error,
fmt,
pin::Pin,
task::{Context, Poll},
time::Duration
};
#[derive(Clone)]
pub struct Builder<T> {
inner: T,
version: upgrade::Version,
}
impl<T> Builder<T>
where
T: Transport,
T::Error: 'static,
{
pub fn new(inner: T, version: upgrade::Version) -> Builder<T> {
Builder { inner, version }
}
pub fn authenticate<C, D, U, E>(self, upgrade: U) -> Authenticated<
AndThen<T, impl FnOnce(C, ConnectedPoint) -> Authenticate<C, U> + Clone>
> where
T: Transport<Output = C>,
C: AsyncRead + AsyncWrite + Unpin,
D: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E> + Clone,
E: Error + 'static,
{
let version = self.version;
Authenticated(Builder::new(self.inner.and_then(move |conn, endpoint| {
Authenticate {
inner: upgrade::apply(conn, upgrade, endpoint, version)
}
}), version))
}
}
#[pin_project::pin_project]
pub struct Authenticate<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>
{
#[pin]
inner: EitherUpgrade<C, U>
}
impl<C, U> Future for Authenticate<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>,
Output = <U as InboundUpgrade<Negotiated<C>>>::Output,
Error = <U as InboundUpgrade<Negotiated<C>>>::Error
>
{
type Output = <EitherUpgrade<C, U> as Future>::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
Future::poll(this.inner, cx)
}
}
#[pin_project::pin_project]
pub struct Multiplex<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>,
{
peer_id: Option<PeerId>,
#[pin]
upgrade: EitherUpgrade<C, U>,
}
impl<C, U, M, E> Future for Multiplex<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E>
{
type Output = Result<(PeerId, M), UpgradeError<E>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let m = match ready!(Future::poll(this.upgrade, cx)) {
Ok(m) => m,
Err(err) => return Poll::Ready(Err(err)),
};
let i = this.peer_id.take().expect("Multiplex future polled after completion.");
Poll::Ready(Ok((i, m)))
}
}
#[derive(Clone)]
pub struct Authenticated<T>(Builder<T>);
impl<T> Authenticated<T>
where
T: Transport,
T::Error: 'static
{
pub fn apply<C, D, U, E>(self, upgrade: U) -> Authenticated<Upgrade<T, U>>
where
T: Transport<Output = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
D: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
E: Error + 'static,
{
Authenticated(Builder::new(Upgrade::new(self.0.inner, upgrade), self.0.version))
}
pub fn multiplex<C, M, U, E>(self, upgrade: U) -> Multiplexed<
AndThen<T, impl FnOnce((PeerId, C), ConnectedPoint) -> Multiplex<C, U> + Clone>
> where
T: Transport<Output = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
M: StreamMuxer,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
E: Error + 'static,
{
let version = self.0.version;
Multiplexed(self.0.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, upgrade, endpoint, version);
Multiplex { peer_id: Some(i), upgrade }
}))
}
pub fn multiplex_ext<C, M, U, E, F>(self, up: F) -> Multiplexed<
AndThen<T, impl FnOnce((PeerId, C), ConnectedPoint) -> Multiplex<C, U> + Clone>
> where
T: Transport<Output = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
M: StreamMuxer,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
E: Error + 'static,
F: for<'a> FnOnce(&'a PeerId, &'a ConnectedPoint) -> U + Clone
{
let version = self.0.version;
Multiplexed(self.0.inner.and_then(move |(peer_id, c), endpoint| {
let upgrade = upgrade::apply(c, up(&peer_id, &endpoint), endpoint, version);
Multiplex { peer_id: Some(peer_id), upgrade }
}))
}
}
#[derive(Clone)]
pub struct Multiplexed<T>(T);
impl<T> Multiplexed<T> {
pub fn boxed<M>(self) -> super::Boxed<(PeerId, StreamMuxerBox)>
where
T: Transport<Output = (PeerId, M)> + Sized + Clone + Send + Sync + 'static,
T::Dial: Send + 'static,
T::Listener: Send + 'static,
T::ListenerUpgrade: Send + 'static,
T::Error: Send + Sync,
M: StreamMuxer + Send + Sync + 'static,
M::Substream: Send + 'static,
M::OutboundSubstream: Send + 'static
{
boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m))))
}
pub fn timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
Multiplexed(TransportTimeout::new(self.0, timeout))
}
pub fn outbound_timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
Multiplexed(TransportTimeout::with_outgoing_timeout(self.0, timeout))
}
pub fn inbound_timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
Multiplexed(TransportTimeout::with_ingoing_timeout(self.0, timeout))
}
}
impl<T> Transport for Multiplexed<T>
where
T: Transport,
{
type Output = T::Output;
type Error = T::Error;
type Listener = T::Listener;
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = T::Dial;
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.0.dial(addr)
}
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
self.0.listen_on(addr)
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.0.address_translation(server, observed)
}
}
type EitherUpgrade<C, U> = future::Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>;
#[derive(Debug, Copy, Clone)]
pub struct Upgrade<T, U> { inner: T, upgrade: U }
impl<T, U> Upgrade<T, U> {
pub fn new(inner: T, upgrade: U) -> Self {
Upgrade { inner, upgrade }
}
}
impl<T, C, D, U, E> Transport for Upgrade<T, U>
where
T: Transport<Output = (PeerId, C)>,
T::Error: 'static,
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
E: Error + 'static
{
type Output = (PeerId, D);
type Error = TransportUpgradeError<T::Error, E>;
type Listener = ListenerStream<T::Listener, U>;
type ListenerUpgrade = ListenerUpgradeFuture<T::ListenerUpgrade, U, C>;
type Dial = DialUpgradeFuture<T::Dial, U, C>;
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let future = self.inner.dial(addr)
.map_err(|err| err.map(TransportUpgradeError::Transport))?;
Ok(DialUpgradeFuture {
future: Box::pin(future),
upgrade: future::Either::Left(Some(self.upgrade))
})
}
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
let stream = self.inner.listen_on(addr)
.map_err(|err| err.map(TransportUpgradeError::Transport))?;
Ok(ListenerStream {
stream: Box::pin(stream),
upgrade: self.upgrade
})
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
}
}
#[derive(Debug)]
pub enum TransportUpgradeError<T, U> {
Transport(T),
Upgrade(UpgradeError<U>),
}
impl<T, U> fmt::Display for TransportUpgradeError<T, U>
where
T: fmt::Display,
U: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TransportUpgradeError::Transport(e) => write!(f, "Transport error: {}", e),
TransportUpgradeError::Upgrade(e) => write!(f, "Upgrade error: {}", e),
}
}
}
impl<T, U> Error for TransportUpgradeError<T, U>
where
T: Error + 'static,
U: Error + 'static,
{
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
TransportUpgradeError::Transport(e) => Some(e),
TransportUpgradeError::Upgrade(e) => Some(e),
}
}
}
pub struct DialUpgradeFuture<F, U, C>
where
U: OutboundUpgrade<Negotiated<C>>,
C: AsyncRead + AsyncWrite + Unpin,
{
future: Pin<Box<F>>,
upgrade: future::Either<Option<U>, (Option<PeerId>, OutboundUpgradeApply<C, U>)>
}
impl<F, U, C, D> Future for DialUpgradeFuture<F, U, C>
where
F: TryFuture<Ok = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
U: OutboundUpgrade<Negotiated<C>, Output = D>,
U::Error: Error
{
type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
loop {
this.upgrade = match this.upgrade {
future::Either::Left(ref mut up) => {
let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx).map_err(TransportUpgradeError::Transport)) {
Ok(v) => v,
Err(err) => return Poll::Ready(Err(err)),
};
let u = up.take().expect("DialUpgradeFuture is constructed with Either::Left(Some).");
future::Either::Right((Some(i), apply_outbound(c, u, upgrade::Version::V1)))
}
future::Either::Right((ref mut i, ref mut up)) => {
let d = match ready!(Future::poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade)) {
Ok(d) => d,
Err(err) => return Poll::Ready(Err(err)),
};
let i = i.take().expect("DialUpgradeFuture polled after completion.");
return Poll::Ready(Ok((i, d)))
}
}
}
}
}
impl<F, U, C> Unpin for DialUpgradeFuture<F, U, C>
where
U: OutboundUpgrade<Negotiated<C>>,
C: AsyncRead + AsyncWrite + Unpin,
{
}
pub struct ListenerStream<S, U> {
stream: Pin<Box<S>>,
upgrade: U
}
impl<S, U, F, C, D, E> Stream for ListenerStream<S, U>
where
S: TryStream<Ok = ListenerEvent<F, E>, Error = E>,
F: TryFuture<Ok = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D> + Clone
{
type Item = Result<ListenerEvent<ListenerUpgradeFuture<F, U, C>, TransportUpgradeError<E, U::Error>>, TransportUpgradeError<E, U::Error>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(TryStream::try_poll_next(self.stream.as_mut(), cx)) {
Some(Ok(event)) => {
let event = event
.map(move |future| {
ListenerUpgradeFuture {
future: Box::pin(future),
upgrade: future::Either::Left(Some(self.upgrade.clone()))
}
})
.map_err(TransportUpgradeError::Transport);
Poll::Ready(Some(Ok(event)))
}
Some(Err(err)) => {
Poll::Ready(Some(Err(TransportUpgradeError::Transport(err))))
}
None => Poll::Ready(None)
}
}
}
impl<S, U> Unpin for ListenerStream<S, U> {
}
pub struct ListenerUpgradeFuture<F, U, C>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>>
{
future: Pin<Box<F>>,
upgrade: future::Either<Option<U>, (Option<PeerId>, InboundUpgradeApply<C, U>)>
}
impl<F, U, C, D> Future for ListenerUpgradeFuture<F, U, C>
where
F: TryFuture<Ok = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D>,
U::Error: Error
{
type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
loop {
this.upgrade = match this.upgrade {
future::Either::Left(ref mut up) => {
let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx).map_err(TransportUpgradeError::Transport)) {
Ok(v) => v,
Err(err) => return Poll::Ready(Err(err))
};
let u = up.take().expect("ListenerUpgradeFuture is constructed with Either::Left(Some).");
future::Either::Right((Some(i), apply_inbound(c, u)))
}
future::Either::Right((ref mut i, ref mut up)) => {
let d = match ready!(TryFuture::try_poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade)) {
Ok(v) => v,
Err(err) => return Poll::Ready(Err(err))
};
let i = i.take().expect("ListenerUpgradeFuture polled after completion.");
return Poll::Ready(Ok((i, d)))
}
}
}
}
}
impl<F, U, C> Unpin for ListenerUpgradeFuture<F, U, C>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>>
{
}