mod event;
pub mod peer;
pub use crate::connection::{ConnectionLimits, ConnectionCounters};
pub use event::{NetworkEvent, IncomingConnection};
pub use peer::Peer;
use crate::{
ConnectedPoint,
Executor,
Multiaddr,
PeerId,
connection::{
ConnectionId,
ConnectionLimit,
ConnectionHandler,
IntoConnectionHandler,
IncomingInfo,
OutgoingInfo,
ListenersEvent,
ListenerId,
ListenersStream,
PendingConnectionError,
Substream,
manager::ManagerConfig,
pool::{Pool, PoolEvent},
},
muxing::StreamMuxer,
transport::{Transport, TransportError},
};
use fnv::{FnvHashMap};
use futures::{prelude::*, future};
use smallvec::SmallVec;
use std::{
collections::hash_map,
convert::TryFrom as _,
error,
fmt,
num::NonZeroUsize,
pin::Pin,
task::{Context, Poll},
};
pub struct Network<TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
local_peer_id: PeerId,
listeners: ListenersStream<TTrans>,
pool: Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error>,
dialing: FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
}
impl<TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
Network<TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: fmt::Debug + Transport,
THandler: fmt::Debug + ConnectionHandler,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("ReachAttempts")
.field("local_peer_id", &self.local_peer_id)
.field("listeners", &self.listeners)
.field("peers", &self.pool)
.field("dialing", &self.dialing)
.finish()
}
}
impl<TTrans, TInEvent, TOutEvent, THandler> Unpin for
Network<TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
}
impl<TTrans, TInEvent, TOutEvent, THandler>
Network<TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
fn disconnect(&mut self, peer: &PeerId) {
self.pool.disconnect(peer);
self.dialing.remove(peer);
}
}
impl<TTrans, TInEvent, TOutEvent, TMuxer, THandler>
Network<TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport + Clone,
TMuxer: StreamMuxer,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send,
{
pub fn new(
transport: TTrans,
local_peer_id: PeerId,
config: NetworkConfig,
) -> Self {
Network {
local_peer_id,
listeners: ListenersStream::new(transport),
pool: Pool::new(local_peer_id, config.manager_config, config.limits),
dialing: Default::default(),
}
}
pub fn transport(&self) -> &TTrans {
self.listeners.transport()
}
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<TTrans::Error>> {
self.listeners.listen_on(addr)
}
pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> {
self.listeners.remove_listener(id)
}
pub fn listen_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
self.listeners.listen_addrs()
}
pub fn address_translation<'a>(&'a self, observed_addr: &'a Multiaddr)
-> impl Iterator<Item = Multiaddr> + 'a
where
TMuxer: 'a,
THandler: 'a,
{
let transport = self.listeners.transport();
let mut addrs: Vec<_> = self.listen_addrs()
.filter_map(move |server| transport.address_translation(server, observed_addr))
.collect();
addrs.sort_unstable();
addrs.dedup();
addrs.into_iter()
}
pub fn local_peer_id(&self) -> &PeerId {
&self.local_peer_id
}
pub fn dial(&mut self, address: &Multiaddr, handler: THandler)
-> Result<ConnectionId, ConnectionLimit>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
let info = OutgoingInfo { address, peer_id: None };
match self.transport().clone().dial(address.clone()) {
Ok(f) => {
let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
self.pool.add_outgoing(f, handler, info)
}
Err(err) => {
let f = future::err(PendingConnectionError::Transport(err));
self.pool.add_outgoing(f, handler, info)
}
}
}
pub fn info(&self) -> NetworkInfo {
let num_peers = self.pool.num_peers();
let connection_counters = self.pool.counters().clone();
NetworkInfo {
num_peers,
connection_counters,
}
}
pub fn incoming_info(&self) -> impl Iterator<Item = IncomingInfo<'_>> {
self.pool.iter_pending_incoming()
}
pub fn unknown_dials(&self) -> impl Iterator<Item = &Multiaddr> {
self.pool.iter_pending_outgoing()
.filter_map(|info| {
if info.peer_id.is_none() {
Some(info.address)
} else {
None
}
})
}
pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.pool.iter_connected()
}
pub fn is_connected(&self, peer: &PeerId) -> bool {
self.pool.is_connected(peer)
}
pub fn is_dialing(&self, peer: &PeerId) -> bool {
self.dialing.contains_key(peer)
}
pub fn is_disconnected(&self, peer: &PeerId) -> bool {
!self.is_connected(peer) && !self.is_dialing(peer)
}
pub fn dialing_peers(&self) -> impl Iterator<Item = &PeerId> {
self.dialing.keys()
}
pub fn peer(&mut self, peer_id: PeerId)
-> Peer<'_, TTrans, TInEvent, TOutEvent, THandler>
{
Peer::new(self, peer_id)
}
pub fn accept(
&mut self,
connection: IncomingConnection<TTrans::ListenerUpgrade>,
handler: THandler,
) -> Result<ConnectionId, ConnectionLimit>
where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
{
let upgrade = connection.upgrade.map_err(|err|
PendingConnectionError::Transport(TransportError::Other(err)));
let info = IncomingInfo {
local_addr: &connection.local_addr,
send_back_addr: &connection.send_back_addr,
};
self.pool.add_incoming(upgrade, handler, info)
}
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler>>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
{
match ListenersStream::poll(Pin::new(&mut self.listeners), cx) {
Poll::Pending => (),
Poll::Ready(ListenersEvent::Incoming {
listener_id,
upgrade,
local_addr,
send_back_addr
}) => {
return Poll::Ready(NetworkEvent::IncomingConnection {
listener_id,
connection: IncomingConnection {
upgrade,
local_addr,
send_back_addr,
}
})
}
Poll::Ready(ListenersEvent::NewAddress { listener_id, listen_addr }) => {
return Poll::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr })
}
Poll::Ready(ListenersEvent::AddressExpired { listener_id, listen_addr }) => {
return Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr })
}
Poll::Ready(ListenersEvent::Closed { listener_id, addresses, reason }) => {
return Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason })
}
Poll::Ready(ListenersEvent::Error { listener_id, error }) => {
return Poll::Ready(NetworkEvent::ListenerError { listener_id, error })
}
}
let event = match self.pool.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(PoolEvent::ConnectionEstablished { connection, num_established }) => {
if let hash_map::Entry::Occupied(mut e) = self.dialing.entry(connection.peer_id()) {
e.get_mut().retain(|s| s.current.0 != connection.id());
if e.get().is_empty() {
e.remove();
}
}
NetworkEvent::ConnectionEstablished {
connection,
num_established,
}
}
Poll::Ready(PoolEvent::PendingConnectionError { id, endpoint, error, handler, pool, .. }) => {
let dialing = &mut self.dialing;
let (next, event) = on_connection_failed(dialing, id, endpoint, error, handler);
if let Some(dial) = next {
let transport = self.listeners.transport().clone();
if let Err(e) = dial_peer_impl(transport, pool, dialing, dial) {
log::warn!("Dialing aborted: {:?}", e);
}
}
event
}
Poll::Ready(PoolEvent::ConnectionClosed { id, connected, error, num_established, .. }) => {
NetworkEvent::ConnectionClosed {
id,
connected,
num_established,
error,
}
}
Poll::Ready(PoolEvent::ConnectionEvent { connection, event }) => {
NetworkEvent::ConnectionEvent {
connection,
event,
}
}
Poll::Ready(PoolEvent::AddressChange { connection, new_endpoint, old_endpoint }) => {
NetworkEvent::AddressChange {
connection,
new_endpoint,
old_endpoint,
}
}
};
Poll::Ready(event)
}
fn dial_peer(&mut self, opts: DialingOpts<PeerId, THandler>)
-> Result<ConnectionId, ConnectionLimit>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Dial: Send + 'static,
TTrans::Error: Send + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
dial_peer_impl(self.transport().clone(), &mut self.pool, &mut self.dialing, opts)
}
}
struct DialingOpts<PeerId, THandler> {
peer: PeerId,
handler: THandler,
address: Multiaddr,
remaining: Vec<Multiaddr>,
}
fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans>(
transport: TTrans,
pool: &mut Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error>,
dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
opts: DialingOpts<PeerId, THandler>
) -> Result<ConnectionId, ConnectionLimit>
where
THandler: IntoConnectionHandler + Send + 'static,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
> + Send + 'static,
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Dial: Send + 'static,
TTrans::Error: error::Error + Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
let result = match transport.dial(opts.address.clone()) {
Ok(fut) => {
let fut = fut.map_err(|e| PendingConnectionError::Transport(TransportError::Other(e)));
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info)
},
Err(err) => {
let fut = future::err(PendingConnectionError::Transport(err));
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info)
},
};
if let Ok(id) = &result {
dialing.entry(opts.peer).or_default().push(
peer::DialingState {
current: (*id, opts.address),
remaining: opts.remaining,
},
);
}
result
}
fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler>(
dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
id: ConnectionId,
endpoint: ConnectedPoint,
error: PendingConnectionError<TTrans::Error>,
handler: Option<THandler>,
) -> (Option<DialingOpts<PeerId, THandler>>, NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler>)
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
let dialing_failed = dialing.iter_mut()
.find_map(|(peer, attempts)| {
if let Some(pos) = attempts.iter().position(|s| s.current.0 == id) {
let attempt = attempts.remove(pos);
let last = attempts.is_empty();
Some((*peer, attempt, last))
} else {
None
}
});
if let Some((peer_id, mut attempt, last)) = dialing_failed {
if last {
dialing.remove(&peer_id);
}
let num_remain = u32::try_from(attempt.remaining.len()).unwrap();
let failed_addr = attempt.current.1.clone();
let (opts, attempts_remaining) =
if num_remain > 0 {
if let Some(handler) = handler {
let next_attempt = attempt.remaining.remove(0);
let opts = DialingOpts {
peer: peer_id,
handler,
address: next_attempt,
remaining: attempt.remaining
};
(Some(opts), num_remain)
} else {
(None, 0)
}
} else {
(None, 0)
};
(opts, NetworkEvent::DialError {
attempts_remaining,
peer_id,
multiaddr: failed_addr,
error,
})
} else {
match endpoint {
ConnectedPoint::Dialer { address } =>
(None, NetworkEvent::UnknownPeerDialError {
multiaddr: address,
error,
}),
ConnectedPoint::Listener { local_addr, send_back_addr } =>
(None, NetworkEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error
})
}
}
}
#[derive(Clone, Debug)]
pub struct NetworkInfo {
num_peers: usize,
connection_counters: ConnectionCounters,
}
impl NetworkInfo {
pub fn num_peers(&self) -> usize {
self.num_peers
}
pub fn connection_counters(&self) -> &ConnectionCounters {
&self.connection_counters
}
}
#[derive(Default)]
pub struct NetworkConfig {
manager_config: ManagerConfig,
limits: ConnectionLimits,
}
impl NetworkConfig {
pub fn with_executor(mut self, e: Box<dyn Executor + Send>) -> Self {
self.manager_config.executor = Some(e);
self
}
pub fn or_else_with_executor<F>(mut self, f: F) -> Self
where
F: FnOnce() -> Option<Box<dyn Executor + Send>>
{
self.manager_config.executor = self.manager_config.executor.or_else(f);
self
}
pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
self.manager_config.task_command_buffer_size = n.get() - 1;
self
}
pub fn with_connection_event_buffer_size(mut self, n: usize) -> Self {
self.manager_config.task_event_buffer_size = n;
self
}
pub fn with_connection_limits(mut self, limits: ConnectionLimits) -> Self {
self.limits = limits;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
struct Dummy;
impl Executor for Dummy {
fn exec(&self, _: Pin<Box<dyn Future<Output=()> + Send>>) { }
}
#[test]
fn set_executor() {
NetworkConfig::default()
.with_executor(Box::new(Dummy))
.with_executor(Box::new(|f| {
async_std::task::spawn(f);
}));
}
}