mod error;
mod handler;
mod listeners;
mod substream;
pub(crate) mod manager;
pub(crate) mod pool;
pub use error::{ConnectionError, PendingConnectionError};
pub use handler::{ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler};
pub use listeners::{ListenerId, ListenersStream, ListenersEvent};
pub use manager::ConnectionId;
pub use substream::{Substream, SubstreamEndpoint, Close};
pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection};
pub use pool::{ConnectionLimits, ConnectionCounters};
use crate::muxing::StreamMuxer;
use crate::{Multiaddr, PeerId};
use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
use std::hash::Hash;
use substream::{Muxing, SubstreamEvent};
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum Endpoint {
Dialer,
Listener,
}
impl std::ops::Not for Endpoint {
type Output = Endpoint;
fn not(self) -> Self::Output {
match self {
Endpoint::Dialer => Endpoint::Listener,
Endpoint::Listener => Endpoint::Dialer
}
}
}
impl Endpoint {
pub fn is_dialer(self) -> bool {
matches!(self, Endpoint::Dialer)
}
pub fn is_listener(self) -> bool {
matches!(self, Endpoint::Listener)
}
}
#[derive(PartialEq, Eq, Debug, Clone, Hash)]
pub enum ConnectedPoint {
Dialer {
address: Multiaddr,
},
Listener {
local_addr: Multiaddr,
send_back_addr: Multiaddr,
}
}
impl From<&'_ ConnectedPoint> for Endpoint {
fn from(endpoint: &'_ ConnectedPoint) -> Endpoint {
endpoint.to_endpoint()
}
}
impl From<ConnectedPoint> for Endpoint {
fn from(endpoint: ConnectedPoint) -> Endpoint {
endpoint.to_endpoint()
}
}
impl ConnectedPoint {
pub fn to_endpoint(&self) -> Endpoint {
match self {
ConnectedPoint::Dialer { .. } => Endpoint::Dialer,
ConnectedPoint::Listener { .. } => Endpoint::Listener
}
}
pub fn is_dialer(&self) -> bool {
match self {
ConnectedPoint::Dialer { .. } => true,
ConnectedPoint::Listener { .. } => false
}
}
pub fn is_listener(&self) -> bool {
match self {
ConnectedPoint::Dialer { .. } => false,
ConnectedPoint::Listener { .. } => true
}
}
pub fn get_remote_address(&self) -> &Multiaddr {
match self {
ConnectedPoint::Dialer { address } => address,
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
}
}
pub fn set_remote_address(&mut self, new_address: Multiaddr) {
match self {
ConnectedPoint::Dialer { address } => *address = new_address,
ConnectedPoint::Listener { send_back_addr, .. } => *send_back_addr = new_address,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Connected {
pub endpoint: ConnectedPoint,
pub peer_id: PeerId,
}
#[derive(Debug, Clone)]
pub enum Event<T> {
Handler(T),
AddressChange(Multiaddr),
}
pub struct Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
muxing: substream::Muxing<TMuxer, THandler::OutboundOpenInfo>,
handler: THandler,
}
impl<TMuxer, THandler> fmt::Debug for Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection")
.field("muxing", &self.muxing)
.field("handler", &self.handler)
.finish()
}
}
impl<TMuxer, THandler> Unpin for Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
}
impl<TMuxer, THandler> Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
pub fn new(muxer: TMuxer, handler: THandler) -> Self {
Connection {
muxing: Muxing::new(muxer),
handler,
}
}
pub fn handler(&self) -> &THandler {
&self.handler
}
pub fn handler_mut(&mut self) -> &mut THandler {
&mut self.handler
}
pub fn inject_event(&mut self, event: THandler::InEvent) {
self.handler.inject_event(event);
}
pub fn close(self) -> Close<TMuxer> {
self.muxing.close().0
}
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>>
{
loop {
let mut io_pending = false;
match self.muxing.poll(cx) {
Poll::Pending => io_pending = true,
Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => {
self.handler.inject_substream(substream, SubstreamEndpoint::Listener)
}
Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { user_data, substream })) => {
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint)
}
Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => {
self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address)));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))),
}
match self.handler.poll(cx) {
Poll::Pending => {
if io_pending {
return Poll::Pending
}
}
Poll::Ready(Ok(ConnectionHandlerEvent::OutboundSubstreamRequest(user_data))) => {
self.muxing.open_substream(user_data);
}
Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))) => {
return Poll::Ready(Ok(Event::Handler(event)));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))),
}
}
}
}
#[derive(Debug, Copy, Clone)]
pub struct IncomingInfo<'a> {
pub local_addr: &'a Multiaddr,
pub send_back_addr: &'a Multiaddr,
}
impl<'a> IncomingInfo<'a> {
pub fn to_connected_point(&self) -> ConnectedPoint {
ConnectedPoint::Listener {
local_addr: self.local_addr.clone(),
send_back_addr: self.send_back_addr.clone(),
}
}
}
#[derive(Debug, Copy, Clone)]
pub struct OutgoingInfo<'a> {
pub address: &'a Multiaddr,
pub peer_id: Option<&'a PeerId>,
}
impl<'a> OutgoingInfo<'a> {
pub fn to_connected_point(&self) -> ConnectedPoint {
ConnectedPoint::Dialer {
address: self.address.clone()
}
}
}
#[derive(Debug, Clone)]
pub struct ConnectionLimit {
pub limit: u32,
pub current: u32,
}
impl fmt::Display for ConnectionLimit {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", self.current, self.limit)
}
}
impl Error for ConnectionLimit {}