use crate::{
ConnectedPoint,
PeerId,
connection::{
self,
Connected,
Connection,
ConnectionId,
ConnectionLimit,
ConnectionError,
ConnectionHandler,
IncomingInfo,
IntoConnectionHandler,
OutgoingInfo,
Substream,
PendingConnectionError,
manager::{self, Manager, ManagerConfig},
},
muxing::StreamMuxer,
};
use either::Either;
use fnv::FnvHashMap;
use futures::prelude::*;
use smallvec::SmallVec;
use std::{convert::TryFrom as _, error, fmt, num::NonZeroU32, task::Context, task::Poll};
pub struct Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
local_id: PeerId,
counters: ConnectionCounters,
manager: Manager<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
established: FnvHashMap<PeerId, FnvHashMap<ConnectionId, ConnectedPoint>>,
pending: FnvHashMap<ConnectionId, (ConnectedPoint, Option<PeerId>)>,
disconnected: Vec<Disconnected>,
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> fmt::Debug
for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Pool")
.field("counters", &self.counters)
.finish()
}
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> Unpin
for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {}
pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
ConnectionEstablished {
connection: EstablishedConnection<'a, TInEvent>,
num_established: NonZeroU32,
},
ConnectionClosed {
id: ConnectionId,
connected: Connected,
error: Option<ConnectionError<THandlerErr>>,
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
num_established: u32,
},
PendingConnectionError {
id: ConnectionId,
endpoint: ConnectedPoint,
error: PendingConnectionError<TTransErr>,
handler: Option<THandler>,
peer: Option<PeerId>,
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
},
ConnectionEvent {
connection: EstablishedConnection<'a, TInEvent>,
event: TOutEvent,
},
AddressChange {
connection: EstablishedConnection<'a, TInEvent>,
new_endpoint: ConnectedPoint,
old_endpoint: ConnectedPoint,
},
}
impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> fmt::Debug
for PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
where
TOutEvent: fmt::Debug,
TTransErr: fmt::Debug,
THandlerErr: fmt::Debug,
TInEvent: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match *self {
PoolEvent::ConnectionEstablished { ref connection, .. } => {
f.debug_tuple("PoolEvent::ConnectionEstablished")
.field(connection)
.finish()
},
PoolEvent::ConnectionClosed { ref id, ref connected, ref error, .. } => {
f.debug_struct("PoolEvent::ConnectionClosed")
.field("id", id)
.field("connected", connected)
.field("error", error)
.finish()
},
PoolEvent::PendingConnectionError { ref id, ref error, .. } => {
f.debug_struct("PoolEvent::PendingConnectionError")
.field("id", id)
.field("error", error)
.finish()
},
PoolEvent::ConnectionEvent { ref connection, ref event } => {
f.debug_struct("PoolEvent::ConnectionEvent")
.field("peer", &connection.peer_id())
.field("event", event)
.finish()
},
PoolEvent::AddressChange { ref connection, ref new_endpoint, ref old_endpoint } => {
f.debug_struct("PoolEvent::AddressChange")
.field("peer", &connection.peer_id())
.field("new_endpoint", new_endpoint)
.field("old_endpoint", old_endpoint)
.finish()
},
}
}
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
{
pub fn new(
local_id: PeerId,
manager_config: ManagerConfig,
limits: ConnectionLimits
) -> Self {
Pool {
local_id,
counters: ConnectionCounters::new(limits),
manager: Manager::new(manager_config),
established: Default::default(),
pending: Default::default(),
disconnected: Vec::new(),
}
}
pub fn counters(&self) -> &ConnectionCounters {
&self.counters
}
pub fn add_incoming<TFut, TMuxer>(
&mut self,
future: TFut,
handler: THandler,
info: IncomingInfo<'_>,
) -> Result<ConnectionId, ConnectionLimit>
where
TFut: Future<
Output = Result<(PeerId, TMuxer), PendingConnectionError<TTransErr>>
> + Send + 'static,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
Error = THandlerErr
> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
self.counters.check_max_pending_incoming()?;
let endpoint = info.to_connected_point();
Ok(self.add_pending(future, handler, endpoint, None))
}
pub fn add_outgoing<TFut, TMuxer>(
&mut self,
future: TFut,
handler: THandler,
info: OutgoingInfo<'_>,
) -> Result<ConnectionId, ConnectionLimit>
where
TFut: Future<
Output = Result<(PeerId, TMuxer), PendingConnectionError<TTransErr>>
> + Send + 'static,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
Error = THandlerErr
> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
self.counters.check_max_pending_outgoing()?;
let endpoint = info.to_connected_point();
Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned()))
}
fn add_pending<TFut, TMuxer>(
&mut self,
future: TFut,
handler: THandler,
endpoint: ConnectedPoint,
peer: Option<PeerId>,
) -> ConnectionId
where
TFut: Future<
Output = Result<(PeerId, TMuxer), PendingConnectionError<TTransErr>>
> + Send + 'static,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
Error = THandlerErr
> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
let future = future.and_then({
let endpoint = endpoint.clone();
let expected_peer = peer;
let local_id = self.local_id;
move |(peer_id, muxer)| {
if let Some(peer) = expected_peer {
if peer != peer_id {
return future::err(PendingConnectionError::InvalidPeerId)
}
}
if local_id == peer_id {
return future::err(PendingConnectionError::InvalidPeerId)
}
let connected = Connected { peer_id, endpoint };
future::ready(Ok((connected, muxer)))
}
});
let id = self.manager.add_pending(future, handler);
self.counters.inc_pending(&endpoint);
self.pending.insert(id, (endpoint, peer));
id
}
pub fn add<TMuxer>(&mut self, c: Connection<TMuxer, THandler::Handler>, i: Connected)
-> Result<ConnectionId, ConnectionLimit>
where
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = connection::Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
Error = THandlerErr
> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
self.counters.check_max_established(&i.endpoint)?;
self.counters.check_max_established_per_peer(self.num_peer_established(&i.peer_id))?;
let id = self.manager.add(c, i.clone());
self.counters.inc_established(&i.endpoint);
self.established.entry(i.peer_id).or_default().insert(id, i.endpoint);
Ok(id)
}
pub fn get(&mut self, id: ConnectionId)
-> Option<PoolConnection<'_, TInEvent>>
{
match self.manager.entry(id) {
Some(manager::Entry::Established(entry)) =>
Some(PoolConnection::Established(EstablishedConnection {
entry
})),
Some(manager::Entry::Pending(entry)) =>
Some(PoolConnection::Pending(PendingConnection {
entry,
pending: &mut self.pending,
counters: &mut self.counters,
})),
None => None
}
}
pub fn get_established(&mut self, id: ConnectionId)
-> Option<EstablishedConnection<'_, TInEvent>>
{
match self.get(id) {
Some(PoolConnection::Established(c)) => Some(c),
_ => None
}
}
pub fn get_outgoing(&mut self, id: ConnectionId)
-> Option<PendingConnection<'_, TInEvent>>
{
match self.pending.get(&id) {
Some((ConnectedPoint::Dialer { .. }, _peer)) =>
match self.manager.entry(id) {
Some(manager::Entry::Pending(entry)) =>
Some(PendingConnection {
entry,
pending: &mut self.pending,
counters: &mut self.counters,
}),
_ => unreachable!("by consistency of `self.pending` with `self.manager`")
}
_ => None
}
}
pub fn is_connected(&self, id: &PeerId) -> bool {
self.established.contains_key(id)
}
pub fn num_peers(&self) -> usize {
self.established.len()
}
pub fn disconnect(&mut self, peer: &PeerId) {
if let Some(conns) = self.established.get(peer) {
let mut num_established = 0;
for (&id, endpoint) in conns.iter() {
if let Some(manager::Entry::Established(e)) = self.manager.entry(id) {
let connected = e.remove();
self.disconnected.push(Disconnected {
id, connected, num_established
});
num_established += 1;
}
self.counters.dec_established(endpoint);
}
}
self.established.remove(peer);
let mut aborted = Vec::new();
for (&id, (_endpoint, peer2)) in &self.pending {
if Some(peer) == peer2.as_ref() {
if let Some(manager::Entry::Pending(e)) = self.manager.entry(id) {
e.abort();
aborted.push(id);
}
}
}
for id in aborted {
if let Some((endpoint, _)) = self.pending.remove(&id) {
self.counters.dec_pending(&endpoint);
}
}
}
pub fn num_peer_established(&self, peer: &PeerId) -> u32 {
num_peer_established(&self.established, peer)
}
pub fn iter_peer_established<'a>(&'a mut self, peer: &PeerId)
-> EstablishedConnectionIter<'a,
impl Iterator<Item = ConnectionId>,
TInEvent,
TOutEvent,
THandler,
TTransErr,
THandlerErr>
{
let ids = self.iter_peer_established_info(peer)
.map(|(id, _endpoint)| *id)
.collect::<SmallVec<[ConnectionId; 10]>>()
.into_iter();
EstablishedConnectionIter { pool: self, ids }
}
pub fn iter_pending_incoming(&self) -> impl Iterator<Item = IncomingInfo<'_>> {
self.iter_pending_info()
.filter_map(|(_, ref endpoint, _)| {
match endpoint {
ConnectedPoint::Listener { local_addr, send_back_addr } => {
Some(IncomingInfo { local_addr, send_back_addr })
},
ConnectedPoint::Dialer { .. } => None,
}
})
}
pub fn iter_pending_outgoing(&self) -> impl Iterator<Item = OutgoingInfo<'_>> {
self.iter_pending_info()
.filter_map(|(_, ref endpoint, ref peer_id)| {
match endpoint {
ConnectedPoint::Listener { .. } => None,
ConnectedPoint::Dialer { address } =>
Some(OutgoingInfo { address, peer_id: peer_id.as_ref() }),
}
})
}
pub fn iter_peer_established_info(&self, peer: &PeerId)
-> impl Iterator<Item = (&ConnectionId, &ConnectedPoint)> + fmt::Debug + '_
{
match self.established.get(peer) {
Some(conns) => Either::Left(conns.iter()),
None => Either::Right(std::iter::empty())
}
}
pub fn iter_pending_info(&self)
-> impl Iterator<Item = (&ConnectionId, &ConnectedPoint, &Option<PeerId>)> + '_
{
self.pending.iter().map(|(id, (endpoint, info))| (id, endpoint, info))
}
pub fn iter_connected<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
self.established.keys()
}
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<
PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
> {
if let Some(Disconnected {
id, connected, num_established
}) = self.disconnected.pop() {
return Poll::Ready(PoolEvent::ConnectionClosed {
id,
connected,
num_established,
error: None,
pool: self,
})
}
loop {
let item = match self.manager.poll(cx) {
Poll::Ready(item) => item,
Poll::Pending => return Poll::Pending,
};
match item {
manager::Event::PendingConnectionError { id, error, handler } => {
if let Some((endpoint, peer)) = self.pending.remove(&id) {
self.counters.dec_pending(&endpoint);
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint,
error,
handler: Some(handler),
peer,
pool: self
})
}
},
manager::Event::ConnectionClosed { id, connected, error } => {
let num_established =
if let Some(conns) = self.established.get_mut(&connected.peer_id) {
if let Some(endpoint) = conns.remove(&id) {
self.counters.dec_established(&endpoint);
}
u32::try_from(conns.len()).unwrap()
} else {
0
};
if num_established == 0 {
self.established.remove(&connected.peer_id);
}
return Poll::Ready(PoolEvent::ConnectionClosed {
id, connected, error, num_established, pool: self
})
}
manager::Event::ConnectionEstablished { entry } => {
let id = entry.id();
if let Some((endpoint, peer)) = self.pending.remove(&id) {
self.counters.dec_pending(&endpoint);
if let Err(e) = self.counters.check_max_established(&endpoint) {
let connected = entry.remove();
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint: connected.endpoint,
error: PendingConnectionError::ConnectionLimit(e),
handler: None,
peer,
pool: self
})
}
let current = num_peer_established(&self.established, &entry.connected().peer_id);
if let Err(e) = self.counters.check_max_established_per_peer(current) {
let connected = entry.remove();
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint: connected.endpoint,
error: PendingConnectionError::ConnectionLimit(e),
handler: None,
peer,
pool: self
})
}
if cfg!(debug_assertions) {
if self.local_id == entry.connected().peer_id {
panic!("Unexpected local peer ID for remote.");
}
if let Some(peer) = peer {
if peer != entry.connected().peer_id {
panic!("Unexpected peer ID mismatch.");
}
}
}
let peer = entry.connected().peer_id;
let conns = self.established.entry(peer).or_default();
let num_established = NonZeroU32::new(u32::try_from(conns.len() + 1).unwrap())
.expect("n + 1 is always non-zero; qed");
self.counters.inc_established(&endpoint);
conns.insert(id, endpoint);
match self.get(id) {
Some(PoolConnection::Established(connection)) =>
return Poll::Ready(PoolEvent::ConnectionEstablished {
connection, num_established
}),
_ => unreachable!("since `entry` is an `EstablishedEntry`.")
}
}
},
manager::Event::ConnectionEvent { entry, event } => {
let id = entry.id();
match self.get(id) {
Some(PoolConnection::Established(connection)) =>
return Poll::Ready(PoolEvent::ConnectionEvent {
connection,
event,
}),
_ => unreachable!("since `entry` is an `EstablishedEntry`.")
}
},
manager::Event::AddressChange { entry, new_endpoint, old_endpoint } => {
let id = entry.id();
match self.established.get_mut(&entry.connected().peer_id) {
Some(list) => *list.get_mut(&id)
.expect("state inconsistency: entry is `EstablishedEntry` but absent \
from `established`") = new_endpoint.clone(),
None => unreachable!("since `entry` is an `EstablishedEntry`.")
};
match self.get(id) {
Some(PoolConnection::Established(connection)) =>
return Poll::Ready(PoolEvent::AddressChange {
connection,
new_endpoint,
old_endpoint,
}),
_ => unreachable!("since `entry` is an `EstablishedEntry`.")
}
},
}
}
}
}
pub enum PoolConnection<'a, TInEvent> {
Pending(PendingConnection<'a, TInEvent>),
Established(EstablishedConnection<'a, TInEvent>),
}
pub struct PendingConnection<'a, TInEvent> {
entry: manager::PendingEntry<'a, TInEvent>,
pending: &'a mut FnvHashMap<ConnectionId, (ConnectedPoint, Option<PeerId>)>,
counters: &'a mut ConnectionCounters,
}
impl<TInEvent>
PendingConnection<'_, TInEvent>
{
pub fn id(&self) -> ConnectionId {
self.entry.id()
}
pub fn peer_id(&self) -> &Option<PeerId> {
&self.pending.get(&self.entry.id()).expect("`entry` is a pending entry").1
}
pub fn endpoint(&self) -> &ConnectedPoint {
&self.pending.get(&self.entry.id()).expect("`entry` is a pending entry").0
}
pub fn abort(self) {
let endpoint = self.pending.remove(&self.entry.id()).expect("`entry` is a pending entry").0;
self.counters.dec_pending(&endpoint);
self.entry.abort();
}
}
pub struct EstablishedConnection<'a, TInEvent> {
entry: manager::EstablishedEntry<'a, TInEvent>,
}
impl<TInEvent> fmt::Debug
for EstablishedConnection<'_, TInEvent>
where
TInEvent: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("EstablishedConnection")
.field("entry", &self.entry)
.finish()
}
}
impl<TInEvent> EstablishedConnection<'_, TInEvent> {
pub fn connected(&self) -> &Connected {
self.entry.connected()
}
pub fn endpoint(&self) -> &ConnectedPoint {
&self.entry.connected().endpoint
}
pub fn peer_id(&self) -> PeerId {
self.entry.connected().peer_id
}
pub fn id(&self) -> ConnectionId {
self.entry.id()
}
pub fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> {
self.entry.notify_handler(event)
}
pub fn poll_ready_notify_handler(&mut self, cx: &mut Context<'_>) -> Poll<Result<(),()>> {
self.entry.poll_ready_notify_handler(cx)
}
pub fn start_close(self) {
self.entry.start_close()
}
}
pub struct EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
ids: I
}
impl<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
where
I: Iterator<Item = ConnectionId>
{
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Option<EstablishedConnection<'_, TInEvent>>
{
while let Some(id) = self.ids.next() {
if self.pool.manager.is_established(&id) {
match self.pool.manager.entry(id) {
Some(manager::Entry::Established(entry)) => {
return Some(EstablishedConnection { entry })
}
_ => panic!("Established entry not found in manager.")
}
}
}
None
}
pub fn into_ids(self) -> impl Iterator<Item = ConnectionId> {
self.ids
}
pub fn into_first<'b>(mut self)
-> Option<EstablishedConnection<'b, TInEvent>>
where 'a: 'b
{
while let Some(id) = self.ids.next() {
if self.pool.manager.is_established(&id) {
match self.pool.manager.entry(id) {
Some(manager::Entry::Established(entry)) => {
return Some(EstablishedConnection { entry })
}
_ => panic!("Established entry not found in manager.")
}
}
}
None
}
}
#[derive(Debug, Clone)]
pub struct ConnectionCounters {
limits: ConnectionLimits,
pending_incoming: u32,
pending_outgoing: u32,
established_incoming: u32,
established_outgoing: u32,
}
impl ConnectionCounters {
fn new(limits: ConnectionLimits) -> Self {
Self {
limits,
pending_incoming: 0,
pending_outgoing: 0,
established_incoming: 0,
established_outgoing: 0,
}
}
pub fn limits(&self) -> &ConnectionLimits {
&self.limits
}
pub fn num_connections(&self) -> u32 {
self.num_pending() + self.num_established()
}
pub fn num_pending(&self) -> u32 {
self.pending_incoming + self.pending_outgoing
}
pub fn num_pending_incoming(&self) -> u32 {
self.pending_incoming
}
pub fn num_pending_outgoing(&self) -> u32 {
self.pending_outgoing
}
pub fn num_established_incoming(&self) -> u32 {
self.established_incoming
}
pub fn num_established_outgoing(&self) -> u32 {
self.established_outgoing
}
pub fn num_established(&self) -> u32 {
self.established_outgoing + self.established_incoming
}
fn inc_pending(&mut self, endpoint: &ConnectedPoint) {
match endpoint {
ConnectedPoint::Dialer { .. } => { self.pending_outgoing += 1; }
ConnectedPoint::Listener { .. } => { self.pending_incoming += 1; }
}
}
fn dec_pending(&mut self, endpoint: &ConnectedPoint) {
match endpoint {
ConnectedPoint::Dialer { .. } => { self.pending_outgoing -= 1; }
ConnectedPoint::Listener { .. } => { self.pending_incoming -= 1; }
}
}
fn inc_established(&mut self, endpoint: &ConnectedPoint) {
match endpoint {
ConnectedPoint::Dialer { .. } => { self.established_outgoing += 1; }
ConnectedPoint::Listener { .. } => { self.established_incoming += 1; }
}
}
fn dec_established(&mut self, endpoint: &ConnectedPoint) {
match endpoint {
ConnectedPoint::Dialer { .. } => { self.established_outgoing -= 1; }
ConnectedPoint::Listener { .. } => { self.established_incoming -= 1; }
}
}
fn check_max_pending_outgoing(&self) -> Result<(), ConnectionLimit> {
Self::check(self.pending_outgoing, self.limits.max_pending_outgoing)
}
fn check_max_pending_incoming(&self) -> Result<(), ConnectionLimit> {
Self::check(self.pending_incoming, self.limits.max_pending_incoming)
}
fn check_max_established(&self, endpoint: &ConnectedPoint)
-> Result<(), ConnectionLimit>
{
match endpoint {
ConnectedPoint::Dialer { .. } =>
Self::check(self.established_outgoing, self.limits.max_established_outgoing),
ConnectedPoint::Listener { .. } => {
Self::check(self.established_incoming, self.limits.max_established_incoming)
}
}
}
fn check_max_established_per_peer(&self, current: u32) -> Result<(), ConnectionLimit> {
Self::check(current, self.limits.max_established_per_peer)
}
fn check(current: u32, limit: Option<u32>) -> Result<(), ConnectionLimit> {
if let Some(limit) = limit {
if current >= limit {
return Err(ConnectionLimit { limit, current })
}
}
Ok(())
}
}
fn num_peer_established(
established: &FnvHashMap<PeerId, FnvHashMap<ConnectionId, ConnectedPoint>>,
peer: &PeerId
) -> u32 {
established.get(peer).map_or(0, |conns|
u32::try_from(conns.len())
.expect("Unexpectedly large number of connections for a peer."))
}
#[derive(Debug, Clone, Default)]
pub struct ConnectionLimits {
max_pending_incoming: Option<u32>,
max_pending_outgoing: Option<u32>,
max_established_incoming: Option<u32>,
max_established_outgoing: Option<u32>,
max_established_per_peer: Option<u32>,
}
impl ConnectionLimits {
pub fn with_max_pending_incoming(mut self, limit: Option<u32>) -> Self {
self.max_pending_incoming = limit;
self
}
pub fn with_max_pending_outgoing(mut self, limit: Option<u32>) -> Self {
self.max_pending_outgoing = limit;
self
}
pub fn with_max_established_incoming(mut self, limit: Option<u32>) -> Self {
self.max_established_incoming = limit;
self
}
pub fn with_max_established_outgoing(mut self, limit: Option<u32>) -> Self {
self.max_established_outgoing = limit;
self
}
pub fn with_max_established_per_peer(mut self, limit: Option<u32>) -> Self {
self.max_established_per_peer = limit;
self
}
}
struct Disconnected {
id: ConnectionId,
connected: Connected,
num_established: u32,
}