mod behaviour;
mod registry;
#[cfg(test)]
mod test;
mod upgrade;
pub mod protocols_handler;
pub mod toggle;
pub use behaviour::{
NetworkBehaviour,
NetworkBehaviourAction,
NetworkBehaviourEventProcess,
PollParameters,
NotifyHandler,
DialPeerCondition
};
pub use protocols_handler::{
IntoProtocolsHandler,
IntoProtocolsHandlerSelect,
KeepAlive,
ProtocolsHandler,
ProtocolsHandlerEvent,
ProtocolsHandlerSelect,
ProtocolsHandlerUpgrErr,
OneShotHandler,
OneShotHandlerConfig,
SubstreamProtocol
};
pub use registry::{AddressScore, AddressRecord, AddAddressResult};
use protocols_handler::{
NodeHandlerWrapperBuilder,
NodeHandlerWrapperError,
};
use futures::{
prelude::*,
executor::ThreadPoolBuilder,
stream::FusedStream,
};
use libp2p_core::{
Executor,
Transport,
Multiaddr,
Negotiated,
PeerId,
connection::{
ConnectionError,
ConnectionId,
ConnectionLimit,
ConnectedPoint,
EstablishedConnection,
IntoConnectionHandler,
ListenerId,
PendingConnectionError,
Substream
},
transport::{self, TransportError},
muxing::StreamMuxerBox,
network::{
ConnectionLimits,
Network,
NetworkInfo,
NetworkEvent,
NetworkConfig,
peer::ConnectedPeer,
},
upgrade::{ProtocolName},
};
use registry::{Addresses, AddressIntoIter};
use smallvec::SmallVec;
use std::{error, fmt, io, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}};
use std::collections::HashSet;
use std::num::{NonZeroU32, NonZeroUsize};
use upgrade::UpgradeInfoSend as _;
pub type Swarm<TBehaviour> = ExpandedSwarm<
TBehaviour,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
<TBehaviour as NetworkBehaviour>::ProtocolsHandler,
>;
pub type NegotiatedSubstream = Negotiated<Substream<StreamMuxerBox>>;
#[derive(Debug)]
pub enum SwarmEvent<TBvEv, THandleErr> {
Behaviour(TBvEv),
ConnectionEstablished {
peer_id: PeerId,
endpoint: ConnectedPoint,
num_established: NonZeroU32,
},
ConnectionClosed {
peer_id: PeerId,
endpoint: ConnectedPoint,
num_established: u32,
cause: Option<ConnectionError<NodeHandlerWrapperError<THandleErr>>>,
},
IncomingConnection {
local_addr: Multiaddr,
send_back_addr: Multiaddr,
},
IncomingConnectionError {
local_addr: Multiaddr,
send_back_addr: Multiaddr,
error: PendingConnectionError<io::Error>,
},
BannedPeer {
peer_id: PeerId,
endpoint: ConnectedPoint,
},
UnreachableAddr {
peer_id: PeerId,
address: Multiaddr,
error: PendingConnectionError<io::Error>,
attempts_remaining: u32,
},
UnknownPeerUnreachableAddr {
address: Multiaddr,
error: PendingConnectionError<io::Error>,
},
NewListenAddr(Multiaddr),
ExpiredListenAddr(Multiaddr),
ListenerClosed {
addresses: Vec<Multiaddr>,
reason: Result<(), io::Error>,
},
ListenerError {
error: io::Error,
},
Dialing(PeerId),
}
pub struct ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where
THandler: IntoProtocolsHandler,
{
network: Network<
transport::Boxed<(PeerId, StreamMuxerBox)>,
TInEvent,
TOutEvent,
NodeHandlerWrapperBuilder<THandler>,
>,
behaviour: TBehaviour,
supported_protocols: SmallVec<[Vec<u8>; 16]>,
listened_addrs: SmallVec<[Multiaddr; 8]>,
external_addrs: Addresses,
banned_peers: HashSet<PeerId>,
pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)>,
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
}
impl<TBehaviour, TInEvent, TOutEvent, THandler> Deref for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where
THandler: IntoProtocolsHandler,
{
type Target = TBehaviour;
fn deref(&self) -> &Self::Target {
&self.behaviour
}
}
impl<TBehaviour, TInEvent, TOutEvent, THandler> DerefMut for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where
THandler: IntoProtocolsHandler,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.behaviour
}
}
impl<TBehaviour, TInEvent, TOutEvent, THandler> Unpin for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where
THandler: IntoProtocolsHandler,
{
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandleErr>
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: IntoProtocolsHandler + Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandleErr>,
THandleErr: error::Error + Send + 'static,
{
pub fn new(
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
local_peer_id: PeerId
) -> Self {
SwarmBuilder::new(transport, behaviour, local_peer_id).build()
}
pub fn network_info(me: &Self) -> NetworkInfo {
me.network.info()
}
pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
me.network.listen_on(addr)
}
pub fn remove_listener(me: &mut Self, id: ListenerId) -> Result<(), ()> {
me.network.remove_listener(id)
}
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> {
let handler = me.behaviour.new_handler()
.into_node_handler_builder()
.with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
me.network.dial(&addr, handler).map(|_id| ())
}
pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result<(), DialError> {
if me.banned_peers.contains(peer_id) {
me.behaviour.inject_dial_failure(peer_id);
return Err(DialError::Banned)
}
let self_listening = &me.listened_addrs;
let mut addrs = me.behaviour.addresses_of_peer(peer_id)
.into_iter()
.filter(|a| !self_listening.contains(a));
let result =
if let Some(first) = addrs.next() {
let handler = me.behaviour.new_handler()
.into_node_handler_builder()
.with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
me.network.peer(peer_id.clone())
.dial(first, addrs, handler)
.map(|_| ())
.map_err(DialError::ConnectionLimit)
} else {
Err(DialError::NoAddresses)
};
if let Err(error) = &result {
log::debug!(
"New dialing attempt to peer {:?} failed: {:?}.",
peer_id, error);
me.behaviour.inject_dial_failure(&peer_id);
}
result
}
pub fn listeners(me: &Self) -> impl Iterator<Item = &Multiaddr> {
me.network.listen_addrs()
}
pub fn local_peer_id(me: &Self) -> &PeerId {
&me.network.local_peer_id()
}
pub fn external_addresses(me: &Self) -> impl Iterator<Item = &AddressRecord> {
me.external_addrs.iter()
}
pub fn add_external_address(me: &mut Self, a: Multiaddr, s: AddressScore) -> AddAddressResult {
me.external_addrs.add(a, s)
}
pub fn remove_external_address(me: &mut Self, addr: &Multiaddr) -> bool {
me.external_addrs.remove(addr)
}
pub fn ban_peer_id(me: &mut Self, peer_id: PeerId) {
if me.banned_peers.insert(peer_id.clone()) {
if let Some(peer) = me.network.peer(peer_id).into_connected() {
peer.disconnect();
}
}
}
pub fn unban_peer_id(me: &mut Self, peer_id: PeerId) {
me.banned_peers.remove(&peer_id);
}
pub fn is_connected(me: &Self, peer_id: &PeerId) -> bool {
me.network.is_connected(peer_id)
}
pub async fn next_event(&mut self) -> SwarmEvent<TBehaviour::OutEvent, THandleErr> {
future::poll_fn(move |cx| ExpandedSwarm::poll_next_event(Pin::new(self), cx)).await
}
pub async fn next(&mut self) -> TBehaviour::OutEvent {
future::poll_fn(move |cx| {
loop {
let event = futures::ready!(ExpandedSwarm::poll_next_event(Pin::new(self), cx));
if let SwarmEvent::Behaviour(event) = event {
return Poll::Ready(event);
}
}
}).await
}
fn poll_next_event(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<SwarmEvent<TBehaviour::OutEvent, THandleErr>>
{
let this = &mut *self;
loop {
let mut network_not_ready = false;
match this.network.poll(cx) {
Poll::Pending => network_not_ready = true,
Poll::Ready(NetworkEvent::ConnectionEvent { connection, event }) => {
let peer = connection.peer_id().clone();
let connection = connection.id();
this.behaviour.inject_event(peer, connection, event);
},
Poll::Ready(NetworkEvent::AddressChange { connection, new_endpoint, old_endpoint }) => {
let peer = connection.peer_id();
let connection = connection.id();
this.behaviour.inject_address_change(&peer, &connection, &old_endpoint, &new_endpoint);
},
Poll::Ready(NetworkEvent::ConnectionEstablished { connection, num_established }) => {
let peer_id = connection.peer_id().clone();
let endpoint = connection.endpoint().clone();
if this.banned_peers.contains(&peer_id) {
this.network.peer(peer_id.clone())
.into_connected()
.expect("the Network just notified us that we were connected; QED")
.disconnect();
return Poll::Ready(SwarmEvent::BannedPeer {
peer_id,
endpoint,
});
} else {
log::debug!("Connection established: {:?}; Total (peer): {}.",
connection.connected(), num_established);
let endpoint = connection.endpoint().clone();
this.behaviour.inject_connection_established(&peer_id, &connection.id(), &endpoint);
if num_established.get() == 1 {
this.behaviour.inject_connected(&peer_id);
}
return Poll::Ready(SwarmEvent::ConnectionEstablished {
peer_id, num_established, endpoint
});
}
},
Poll::Ready(NetworkEvent::ConnectionClosed { id, connected, error, num_established }) => {
if let Some(error) = error.as_ref() {
log::debug!("Connection {:?} closed: {:?}", connected, error);
} else {
log::debug!("Connection {:?} closed (active close).", connected);
}
let peer_id = connected.peer_id;
let endpoint = connected.endpoint;
this.behaviour.inject_connection_closed(&peer_id, &id, &endpoint);
if num_established == 0 {
this.behaviour.inject_disconnected(&peer_id);
}
return Poll::Ready(SwarmEvent::ConnectionClosed {
peer_id,
endpoint,
cause: error,
num_established,
});
},
Poll::Ready(NetworkEvent::IncomingConnection { connection, .. }) => {
let handler = this.behaviour.new_handler()
.into_node_handler_builder()
.with_substream_upgrade_protocol_override(this.substream_upgrade_protocol_override);
let local_addr = connection.local_addr.clone();
let send_back_addr = connection.send_back_addr.clone();
if let Err(e) = this.network.accept(connection, handler) {
log::warn!("Incoming connection rejected: {:?}", e);
}
return Poll::Ready(SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
});
},
Poll::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr }) => {
log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr);
if !this.listened_addrs.contains(&listen_addr) {
this.listened_addrs.push(listen_addr.clone())
}
this.behaviour.inject_new_listen_addr(&listen_addr);
return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr));
}
Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) => {
log::debug!("Listener {:?}; Expired address {:?}.", listener_id, listen_addr);
this.listened_addrs.retain(|a| a != &listen_addr);
this.behaviour.inject_expired_listen_addr(&listen_addr);
return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr));
}
Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => {
log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
for addr in addresses.iter() {
this.behaviour.inject_expired_listen_addr(addr);
}
this.behaviour.inject_listener_closed(listener_id, match &reason {
Ok(()) => Ok(()),
Err(err) => Err(err),
});
return Poll::Ready(SwarmEvent::ListenerClosed {
addresses,
reason,
});
}
Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) => {
this.behaviour.inject_listener_error(listener_id, &error);
return Poll::Ready(SwarmEvent::ListenerError {
error,
});
},
Poll::Ready(NetworkEvent::IncomingConnectionError { local_addr, send_back_addr, error }) => {
log::debug!("Incoming connection failed: {:?}", error);
return Poll::Ready(SwarmEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error,
});
},
Poll::Ready(NetworkEvent::DialError { peer_id, multiaddr, error, attempts_remaining }) => {
log::debug!(
"Connection attempt to {:?} via {:?} failed with {:?}. Attempts remaining: {}.",
peer_id, multiaddr, error, attempts_remaining);
this.behaviour.inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error);
if attempts_remaining == 0 {
this.behaviour.inject_dial_failure(&peer_id);
}
return Poll::Ready(SwarmEvent::UnreachableAddr {
peer_id,
address: multiaddr,
error,
attempts_remaining,
});
},
Poll::Ready(NetworkEvent::UnknownPeerDialError { multiaddr, error, .. }) => {
log::debug!("Connection attempt to address {:?} of unknown peer failed with {:?}",
multiaddr, error);
this.behaviour.inject_addr_reach_failure(None, &multiaddr, &error);
return Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr {
address: multiaddr,
error,
});
},
}
if let Some((peer_id, handler, event)) = this.pending_event.take() {
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_connected() {
match handler {
PendingNotifyHandler::One(conn_id) =>
if let Some(mut conn) = peer.connection(conn_id) {
if let Some(event) = notify_one(&mut conn, event, cx) {
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
},
PendingNotifyHandler::Any(ids) => {
if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
}
}
}
}
debug_assert!(this.pending_event.is_none());
let behaviour_poll = {
let mut parameters = SwarmPollParameters {
local_peer_id: &mut this.network.local_peer_id(),
supported_protocols: &this.supported_protocols,
listened_addrs: &this.listened_addrs,
external_addrs: &this.external_addrs
};
this.behaviour.poll(cx, &mut parameters)
};
match behaviour_poll {
Poll::Pending if network_not_ready => return Poll::Pending,
Poll::Pending => (),
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
return Poll::Ready(SwarmEvent::Behaviour(event))
},
Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => {
let _ = ExpandedSwarm::dial_addr(&mut *this, address);
},
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => {
if this.banned_peers.contains(&peer_id) {
this.behaviour.inject_dial_failure(&peer_id);
} else {
let condition_matched = match condition {
DialPeerCondition::Disconnected => this.network.is_disconnected(&peer_id),
DialPeerCondition::NotDialing => !this.network.is_dialing(&peer_id),
DialPeerCondition::Always => true,
};
if condition_matched {
if ExpandedSwarm::dial(this, &peer_id).is_ok() {
return Poll::Ready(SwarmEvent::Dialing(peer_id))
}
} else {
log::trace!("Condition for new dialing attempt to {:?} not met: {:?}",
peer_id, condition);
let self_listening = &this.listened_addrs;
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_dialing() {
let addrs = this.behaviour.addresses_of_peer(peer.id());
let mut attempt = peer.some_attempt();
for a in addrs {
if !self_listening.contains(&a) {
attempt.add_address(a);
}
}
}
}
}
},
Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => {
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_connected() {
match handler {
NotifyHandler::One(connection) => {
if let Some(mut conn) = peer.connection(connection) {
if let Some(event) = notify_one(&mut conn, event, cx) {
let handler = PendingNotifyHandler::One(connection);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
}
}
NotifyHandler::Any => {
let ids = peer.connections().into_ids().collect();
if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
}
}
}
},
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => {
for addr in this.network.address_translation(&address) {
if this.external_addrs.iter().all(|a| &a.addr != &addr) {
this.behaviour.inject_new_external_addr(&addr);
}
this.external_addrs.add(addr, score);
}
},
}
}
}
}
enum PendingNotifyHandler {
One(ConnectionId),
Any(SmallVec<[ConnectionId; 10]>),
}
fn notify_one<'a, TInEvent>(
conn: &mut EstablishedConnection<'a, TInEvent>,
event: TInEvent,
cx: &mut Context<'_>,
) -> Option<TInEvent>
{
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => Some(event),
Poll::Ready(Err(())) => None,
Poll::Ready(Ok(())) => {
let _ = conn.notify_handler(event);
None
}
}
}
fn notify_any<'a, TTrans, TInEvent, TOutEvent, THandler>(
ids: SmallVec<[ConnectionId; 10]>,
peer: &mut ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>,
event: TInEvent,
cx: &mut Context<'_>,
) -> Option<(TInEvent, SmallVec<[ConnectionId; 10]>)>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
let mut pending = SmallVec::new();
let mut event = Some(event);
for id in ids.into_iter() {
if let Some(mut conn) = peer.connection(id) {
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => pending.push(id),
Poll::Ready(Err(())) => {}
Poll::Ready(Ok(())) => {
let e = event.take().expect("by (1),(2)");
if let Err(e) = conn.notify_handler(e) {
event = Some(e)
} else {
break
}
}
}
}
}
event.and_then(|e|
if !pending.is_empty() {
Some((e, pending))
} else {
None
})
}
impl<TBehaviour, TInEvent, TOutEvent, THandler> Stream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
{
type Item = TBehaviour::OutEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let event = futures::ready!(ExpandedSwarm::poll_next_event(self.as_mut(), cx));
if let SwarmEvent::Behaviour(event) = event {
return Poll::Ready(Some(event));
}
}
}
}
impl<TBehaviour, TInEvent, TOutEvent, THandler> FusedStream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
{
fn is_terminated(&self) -> bool {
false
}
}
pub struct SwarmPollParameters<'a> {
local_peer_id: &'a PeerId,
supported_protocols: &'a [Vec<u8>],
listened_addrs: &'a [Multiaddr],
external_addrs: &'a Addresses,
}
impl<'a> PollParameters for SwarmPollParameters<'a> {
type SupportedProtocolsIter = std::vec::IntoIter<Vec<u8>>;
type ListenedAddressesIter = std::vec::IntoIter<Multiaddr>;
type ExternalAddressesIter = AddressIntoIter;
fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
self.supported_protocols.to_vec().into_iter()
}
fn listened_addresses(&self) -> Self::ListenedAddressesIter {
self.listened_addrs.to_vec().into_iter()
}
fn external_addresses(&self) -> Self::ExternalAddressesIter {
self.external_addrs.clone().into_iter()
}
fn local_peer_id(&self) -> &PeerId {
self.local_peer_id
}
}
pub struct SwarmBuilder<TBehaviour> {
local_peer_id: PeerId,
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
network_config: NetworkConfig,
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
}
impl<TBehaviour> SwarmBuilder<TBehaviour>
where TBehaviour: NetworkBehaviour,
{
pub fn new(
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
local_peer_id: PeerId
) -> Self {
SwarmBuilder {
local_peer_id,
transport: transport,
behaviour,
network_config: Default::default(),
substream_upgrade_protocol_override: None,
}
}
pub fn executor(mut self, e: Box<dyn Executor + Send>) -> Self {
self.network_config = self.network_config.with_executor(e);
self
}
pub fn notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
self.network_config = self.network_config.with_notify_handler_buffer_size(n);
self
}
pub fn connection_event_buffer_size(mut self, n: usize) -> Self {
self.network_config = self.network_config.with_connection_event_buffer_size(n);
self
}
pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self {
self.network_config = self.network_config.with_connection_limits(limits);
self
}
pub fn substream_upgrade_protocol_override(mut self, v: libp2p_core::upgrade::Version) -> Self {
self.substream_upgrade_protocol_override = Some(v);
self
}
pub fn build(mut self) -> Swarm<TBehaviour> {
let supported_protocols = self.behaviour
.new_handler()
.inbound_protocol()
.protocol_info()
.into_iter()
.map(|info| info.protocol_name().to_vec())
.collect();
let network_cfg = self.network_config.or_else_with_executor(|| {
match ThreadPoolBuilder::new()
.name_prefix("libp2p-swarm-task-")
.create()
{
Ok(tp) => {
Some(Box::new(move |f| tp.spawn_ok(f)))
},
Err(err) => {
log::warn!("Failed to create executor thread pool: {:?}", err);
None
}
}
});
let network = Network::new(self.transport, self.local_peer_id, network_cfg);
ExpandedSwarm {
network,
behaviour: self.behaviour,
supported_protocols,
listened_addrs: SmallVec::new(),
external_addrs: Addresses::default(),
banned_peers: HashSet::new(),
pending_event: None,
substream_upgrade_protocol_override: self.substream_upgrade_protocol_override,
}
}
}
#[derive(Debug)]
pub enum DialError {
Banned,
ConnectionLimit(ConnectionLimit),
NoAddresses
}
impl fmt::Display for DialError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err),
DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
DialError::Banned => write!(f, "Dial error: peer is banned.")
}
}
}
impl error::Error for DialError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
DialError::ConnectionLimit(err) => Some(err),
DialError::NoAddresses => None,
DialError::Banned => None
}
}
}
#[derive(Clone, Default)]
pub struct DummyBehaviour {
}
impl NetworkBehaviour for DummyBehaviour {
type ProtocolsHandler = protocols_handler::DummyProtocolsHandler;
type OutEvent = void::Void;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
protocols_handler::DummyProtocolsHandler::default()
}
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, _: &PeerId) {}
fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
fn inject_disconnected(&mut self, _: &PeerId) {}
fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
fn inject_event(&mut self, _: PeerId, _: ConnectionId,
_: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent) {}
fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters) ->
Poll<NetworkBehaviourAction<<Self::ProtocolsHandler as
ProtocolsHandler>::InEvent, Self::OutEvent>>
{
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use crate::protocols_handler::DummyProtocolsHandler;
use crate::test::{MockBehaviour, CallTraceBehaviour};
use futures::{future, executor};
use libp2p_core::{
identity,
upgrade,
multiaddr,
transport
};
use libp2p_noise as noise;
use super::*;
fn new_test_swarm<T, O>(handler_proto: T) -> Swarm<CallTraceBehaviour<MockBehaviour<T, O>>>
where
T: ProtocolsHandler + Clone,
T::OutEvent: Clone,
O: Send + 'static
{
let id_keys = identity::Keypair::generate_ed25519();
let pubkey = id_keys.public();
let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys).unwrap();
let transport = transport::MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(libp2p_mplex::MplexConfig::new())
.boxed();
let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto));
SwarmBuilder::new(transport, behaviour, pubkey.into()).build()
}
#[test]
fn test_connect_disconnect_ban() {
let mut handler_proto = DummyProtocolsHandler::default();
handler_proto.keep_alive = KeepAlive::Yes;
let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone());
let mut swarm2 = new_test_swarm::<_, ()>(handler_proto);
let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
Swarm::listen_on(&mut swarm1, addr1.clone().into()).unwrap();
Swarm::listen_on(&mut swarm2, addr2.clone().into()).unwrap();
enum State {
Connecting,
Disconnecting,
}
let swarm1_id = Swarm::local_peer_id(&swarm1).clone();
let mut banned = false;
let mut unbanned = false;
let num_connections = 10;
for _ in 0 .. num_connections {
Swarm::dial_addr(&mut swarm1, addr2.clone()).unwrap();
}
let mut state = State::Connecting;
executor::block_on(future::poll_fn(move |cx| {
loop {
let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
match state {
State::Connecting => {
for s in &[&swarm1, &swarm2] {
if s.behaviour.inject_connection_established.len() > 0 {
assert_eq!(s.behaviour.inject_connected.len(), 1);
} else {
assert_eq!(s.behaviour.inject_connected.len(), 0);
}
assert!(s.behaviour.inject_connection_closed.len() == 0);
assert!(s.behaviour.inject_disconnected.len() == 0);
}
if [&swarm1, &swarm2].iter().all(|s| {
s.behaviour.inject_connection_established.len() == num_connections
}) {
if banned {
return Poll::Ready(())
}
Swarm::ban_peer_id(&mut swarm2, swarm1_id.clone());
swarm1.behaviour.reset();
swarm2.behaviour.reset();
banned = true;
state = State::Disconnecting;
}
}
State::Disconnecting => {
for s in &[&swarm1, &swarm2] {
if s.behaviour.inject_connection_closed.len() < num_connections {
assert_eq!(s.behaviour.inject_disconnected.len(), 0);
} else {
assert_eq!(s.behaviour.inject_disconnected.len(), 1);
}
assert_eq!(s.behaviour.inject_connection_established.len(), 0);
assert_eq!(s.behaviour.inject_connected.len(), 0);
}
if [&swarm1, &swarm2].iter().all(|s| {
s.behaviour.inject_connection_closed.len() == num_connections
}) {
if unbanned {
return Poll::Ready(())
}
Swarm::unban_peer_id(&mut swarm2, swarm1_id.clone());
swarm1.behaviour.reset();
swarm2.behaviour.reset();
unbanned = true;
for _ in 0 .. num_connections {
Swarm::dial_addr(&mut swarm2, addr1.clone()).unwrap();
}
state = State::Connecting;
}
}
}
if poll1.is_pending() && poll2.is_pending() {
return Poll::Pending
}
}
}))
}
}