pub mod connection;
mod control;
mod dial;
mod muxer;
pub mod network;
mod registry;
pub mod cli;
pub mod identify;
pub mod metrics;
pub mod ping;
pub mod protocol_handler;
pub mod substream;
pub use control::Control;
pub use protocol_handler::DummyProtocolHandler;
use fnv::FnvHashMap;
use futures::channel::{mpsc, oneshot};
use futures::future::Either;
use futures::prelude::*;
use smallvec::SmallVec;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use std::{error, fmt};
use async_std::task;
use libp2prs_core::peerstore::{PeerStore, ADDRESS_TTL};
use libp2prs_core::{
multiaddr::{protocol, Multiaddr},
muxing::IStreamMuxer,
transport::{upgrade::ITransportEx, TransportError},
PeerId, ProtocolId, PublicKey,
};
use crate::connection::{Connection, ConnectionId, ConnectionLimit, ConnectionView, Direction};
use crate::control::{DumpCommand, SwarmControlCmd};
use crate::dial::EitherDialAddr;
use crate::identify::{IdentifyConfig, IdentifyHandler, IdentifyInfo, IdentifyPushHandler};
use crate::metrics::metric::Metric;
use crate::muxer::Muxer;
use crate::network::NetworkInfo;
use crate::ping::{PingConfig, PingHandler};
use crate::protocol_handler::IProtocolHandler;
use crate::registry::Addresses;
use crate::substream::{ConnectInfo, StreamId, Substream, SubstreamView};
use libp2prs_core::routing::IRouting;
use libp2prs_core::translation::address_translation;
use libp2prs_core::transport::ListenerEvent;
type Result<T> = std::result::Result<T, SwarmError>;
pub const PEERSTORE_GC_PURGE_INTERVAL: Duration = Duration::from_secs(10 * 60);
const LIBP2P_RS_PROTOCOL_VERSION: &str = "ipfs/0.1.0";
const LIBP2P_RS_AGENT_VERSION: &str = "libp2p-rs/0.1.0";
#[derive(Debug)]
pub enum SwarmEvent {
ConnectionEstablished {
stream_muxer: IStreamMuxer,
direction: Direction,
tid: Option<TransactionId>,
},
ConnectionClosed {
cid: ConnectionId,
error: TransportError,
},
StreamError {
cid: ConnectionId,
error: TransportError,
},
StreamOpened {
view: SubstreamView,
},
IncomingConnectionError {
remote_addr: Multiaddr,
error: TransportError,
},
OutgoingConnectionError {
peer_id: PeerId,
error: SwarmError,
tid: TransactionId,
},
ListenAddressAdded(Multiaddr),
ListenAddressDeleted(Multiaddr),
ListenerClosed {
addresses: Vec<Multiaddr>,
reason: TransportError,
},
PingResult {
cid: ConnectionId,
result: Result<Duration>,
},
IdentifyResult {
cid: ConnectionId,
result: Result<(IdentifyInfo, Multiaddr)>,
},
}
#[derive(Clone, Default)]
struct Transports {
inner: FnvHashMap<u32, ITransportEx>,
}
impl Transports {
pub(crate) fn lookup_by_addr(&self, mut addr: Multiaddr) -> Result<ITransportEx> {
log::debug!("lookup transport for addr={}", addr);
if let Some(d) = addr.pop() {
if let Ok(id) = d.get_key() {
if let Some(transport) = self.inner.get(&id).map(|s| s.box_clone()) {
return Ok(transport);
}
}
}
Err(SwarmError::Transport(TransportError::MultiaddrNotSupported(addr)))
}
pub(crate) fn get(&self, id: &u32) -> Option<&ITransportEx> {
self.inner.get(id)
}
pub(crate) fn add(&mut self, id: u32, transport: ITransportEx) -> Option<ITransportEx> {
self.inner.insert(id, transport)
}
}
type DialCallback = Box<dyn FnOnce(Result<&mut Connection>) + Send>;
type TransactionId = usize;
pub struct Swarm {
peer_store: PeerStore,
muxer: Muxer,
routing: Option<IRouting>,
transports: Transports,
public_key: PublicKey,
local_peer_id: PeerId,
next_connection_id: usize,
listened_addrs: SmallVec<[Multiaddr; 8]>,
external_addrs: Addresses,
metric: Arc<Metric>,
banned_peers: HashSet<PeerId>,
connections_by_id: FnvHashMap<ConnectionId, Connection>,
connections_by_peer: FnvHashMap<PeerId, Vec<ConnectionId>>,
event_receiver: mpsc::UnboundedReceiver<SwarmEvent>,
event_sender: mpsc::UnboundedSender<SwarmEvent>,
ctrl_receiver: mpsc::Receiver<SwarmControlCmd>,
ctrl_sender: mpsc::Sender<SwarmControlCmd>,
dialer: dial::AsyncDialer,
next_tid: TransactionId,
dial_transactions: FnvHashMap<TransactionId, DialCallback>,
}
#[allow(dead_code)]
impl Swarm {
pub fn new(
key: PublicKey,
) -> Self {
let (event_tx, event_rx) = mpsc::unbounded();
let (ctrl_tx, ctrl_rx) = mpsc::channel(0);
let peer_store = PeerStore::default();
if let Err(e) = peer_store.load_data() {
if e.kind() != std::io::ErrorKind::UnexpectedEof {
log::info!("PeerStore load data error: {}", e);
}
}
let metric = Metric::new();
Swarm {
peer_store,
muxer: Muxer::new(),
routing: None,
transports: Default::default(),
public_key: key.clone(),
local_peer_id: key.into_peer_id(),
next_connection_id: 0,
listened_addrs: Default::default(),
external_addrs: Default::default(),
banned_peers: Default::default(),
connections_by_id: Default::default(),
connections_by_peer: Default::default(),
metric: Arc::new(metric),
event_receiver: event_rx,
event_sender: event_tx,
ctrl_receiver: ctrl_rx,
ctrl_sender: ctrl_tx,
dialer: dial::AsyncDialer::new(),
next_tid: 0,
dial_transactions: Default::default(),
}
}
fn assign_cid(&mut self) -> usize {
self.next_connection_id += 1;
self.next_connection_id
}
fn assign_tid(&mut self) -> TransactionId {
self.next_tid += 1;
self.next_tid
}
pub fn with_transport(mut self, transport: ITransportEx) -> Self {
let protocols = transport.protocols();
if protocols.is_empty() {
panic!("Shouldn't happen: no protocols found in Transport");
}
let mut registered: Vec<protocol::Protocol> = vec![];
for p in protocols.iter() {
if self.transports.get(p).is_some() {
let proto = protocol::Protocol::get_enum(*p).unwrap();
registered.push(proto);
}
}
if !registered.is_empty() {
log::info!("transports already registered for protocol(s): {:?}", registered);
}
for p in protocols.iter() {
log::debug!("add protocol={}", p);
self.transports.add(*p, transport.box_clone());
}
self
}
pub fn with_protocol(mut self, p: IProtocolHandler) -> Self {
self.muxer.add_protocol_handler(p);
self
}
pub fn with_ping(mut self, ping: PingConfig) -> Self {
self.muxer.add_protocol_handler(Box::new(PingHandler::new(ping)));
self
}
pub fn with_routing(mut self, routing: IRouting) -> Self {
self.routing = Some(routing);
self
}
pub fn with_metric(mut self, metric: Metric) -> Self {
self.metric = Arc::new(metric);
self
}
pub fn with_identify(mut self, config: IdentifyConfig) -> Self {
let handler = IdentifyHandler::new(self.ctrl_sender.clone());
self.muxer.add_protocol_handler(Box::new(handler));
let handler = IdentifyPushHandler::new(config, self.event_sender.clone());
self.muxer.add_protocol_handler(Box::new(handler));
self
}
pub fn control(&self) -> Control {
Control::new(self.ctrl_sender.clone(), self.peer_store.clone(), self.metric.clone())
}
async fn handle_messages(&mut self) -> Result<()> {
loop {
let either = future::select(self.event_receiver.next(), self.ctrl_receiver.next()).await;
match either {
Either::Left((evt, _)) => {
if let Some(evt) = evt {
self.on_event(evt);
} else {
log::debug!("Swarm event channel is closed, closing down...");
return Err(SwarmError::Closing(2));
}
}
Either::Right((cmd, _)) => {
if let Some(cmd) = cmd {
let _ = self.on_command(cmd);
} else {
return Err(SwarmError::Closing(1));
}
}
}
}
}
fn kickoff_address_change(&mut self) {
let own_addrs = self.get_self_addrs();
for handler in self.muxer.protocol_handlers.values_mut() {
handler.address_changed(own_addrs.clone());
}
}
fn on_event(&mut self, event: SwarmEvent) {
log::trace!("Swarm event={:?}", event);
match event {
SwarmEvent::ListenerClosed { addresses: _, reason: _ } => {}
SwarmEvent::ListenAddressAdded(addr) => {
if !self.listened_addrs.contains(&addr) {
log::info!("New address added {}", addr);
self.listened_addrs.push(addr);
self.kickoff_address_change();
}
}
SwarmEvent::ListenAddressDeleted(addr) => {
if let Some(pos) = self.listened_addrs.iter().position(|a| a == &addr) {
log::info!("Old address deleted {}", addr);
self.listened_addrs.remove(pos);
self.kickoff_address_change();
}
}
SwarmEvent::ConnectionEstablished {
stream_muxer,
direction,
tid,
} => {
let _ = self.handle_connection_opened(stream_muxer, direction, tid);
}
SwarmEvent::ConnectionClosed { cid, error: _ } => {
let _ = self.handle_connection_closed(cid);
}
SwarmEvent::OutgoingConnectionError { tid, peer_id, error } => {
let _ = self.handle_connection_error(peer_id, error, tid);
}
SwarmEvent::IncomingConnectionError { remote_addr, error } => {
log::debug!("incoming connection error for {:?} {:?}", remote_addr, error);
}
SwarmEvent::StreamError { .. } => {
}
SwarmEvent::StreamOpened { view } => {
let _ = self.handle_stream_opened(view);
}
SwarmEvent::PingResult { cid, result } => {
let _ = self.handle_ping_result(cid, result);
}
SwarmEvent::IdentifyResult { cid, result } => {
let _ = self.handle_identify_result(cid, result);
}
}
}
fn on_command(&mut self, cmd: SwarmControlCmd) -> Result<()> {
log::trace!("Swarm control command={:?}", cmd);
match cmd {
SwarmControlCmd::Connect(peer_id, addrs, reply) => {
let _ = self.on_connect(peer_id, addrs, reply);
}
SwarmControlCmd::NewConnection(peer_id, use_routing, reply) => {
let _ = self.on_new_connection(peer_id, use_routing, reply);
}
SwarmControlCmd::CloseConnection(peer_id, reply) => {
let _ = self.on_close_connection(peer_id, reply);
}
SwarmControlCmd::NewStream(peer_id, pids, use_routing, reply) => {
let _ = self.on_new_stream(peer_id, pids, use_routing, reply);
}
SwarmControlCmd::CloseStream(cid, sid) => {
let _ = self.on_close_stream(cid, sid);
}
SwarmControlCmd::SelfAddresses(reply) => {
let _ = self.on_retrieve_own_addresses(|r| {
let _ = reply.send(r);
});
}
SwarmControlCmd::NetworkInfo(reply) => {
let _ = self.on_retrieve_network_info(|r| {
let _ = reply.send(r);
});
}
SwarmControlCmd::IdentifyInfo(reply) => {
let _ = self.on_retrieve_identify_info(|r| {
let _ = reply.send(r);
});
}
SwarmControlCmd::Dump(cmd) => match cmd {
DumpCommand::Connections(peer_id, reply) => {
let _ = self.on_retrieve_connection_views(peer_id, |r| {
let _ = reply.send(r);
});
}
DumpCommand::Streams(peer_id, reply) => {
let _ = self.on_retrieve_substream_views(peer_id, |r| {
let _ = reply.send(r);
});
}
},
}
Ok(())
}
fn on_connect(&mut self, peer_id: PeerId, addrs: Vec<Multiaddr>, reply: oneshot::Sender<Result<()>>) -> Result<()> {
if let Some(_conn) = self.get_best_conn(&peer_id) {
let _ = reply.send(Ok(()));
} else {
self.dial_addr(peer_id, addrs, |r: Result<&mut Connection>| {
let _ = reply.send(r.map(|_| ()));
});
}
Ok(())
}
fn on_new_connection(&mut self, peer_id: PeerId, use_routing: bool, reply: oneshot::Sender<Result<()>>) -> Result<()> {
if let Some(_conn) = self.get_best_conn(&peer_id) {
let _ = reply.send(Ok(()));
} else {
self.dial_peer(peer_id, use_routing, |r: Result<&mut Connection>| {
let _ = reply.send(r.map(|_| ()));
});
}
Ok(())
}
fn on_close_connection(&mut self, peer_id: PeerId, reply: oneshot::Sender<Result<()>>) -> Result<()> {
if let Some(ids) = self.connections_by_peer.get(&peer_id) {
for id in ids {
if let Some(c) = self.connections_by_id.get(&id) {
c.close()
}
}
}
let _ = reply.send(Ok(()));
Ok(())
}
fn on_new_stream(
&mut self,
peer_id: PeerId,
pids: Vec<ProtocolId>,
use_routing: bool,
reply: oneshot::Sender<Result<Substream>>,
) -> Result<()> {
if let Some(connection) = self.get_best_conn(&peer_id) {
log::debug!(
"open a stream using the existing connection {:?} for {:?}",
connection.id(),
peer_id
);
connection.open_stream(pids, move |r| {
let _ = reply.send(r.map_err(|e| e.into()));
});
} else {
log::debug!("dial and open a stream for {:?}", peer_id);
self.dial_peer(peer_id.clone(), use_routing, |r: Result<&mut Connection>| match r {
Ok(connection) => {
connection.open_stream(pids, |r| {
let _ = reply.send(r.map_err(|e| e.into()));
});
}
Err(e) => {
let _ = reply.send(Err(e));
}
});
}
Ok(())
}
fn on_close_stream(&mut self, cid: ConnectionId, sid: StreamId) -> Result<()> {
self.handle_stream_closed(cid, sid)
}
fn on_retrieve_own_addresses(&mut self, f: impl FnOnce(Vec<Multiaddr>)) -> Result<()> {
f(self.get_self_addrs());
Ok(())
}
fn on_retrieve_network_info(&mut self, f: impl FnOnce(NetworkInfo)) -> Result<()> {
f(self.get_network_info());
Ok(())
}
fn on_retrieve_identify_info(&mut self, f: impl FnOnce(IdentifyInfo)) -> Result<()> {
f(self.get_identify_info());
Ok(())
}
fn on_retrieve_connection_views(&mut self, pid: Option<PeerId>, f: impl FnOnce(Vec<ConnectionView>)) -> Result<()> {
f(self.get_connection_views(pid));
Ok(())
}
fn on_retrieve_substream_views(&mut self, pid: PeerId, f: impl FnOnce(Result<Vec<SubstreamView>>)) -> Result<()> {
f(self.get_substream_views(pid));
Ok(())
}
pub fn start(self) {
let mut swarm = self;
let peer_store = swarm.peer_store.clone();
let (mut tx, mut rx) = mpsc::channel::<()>(0);
task::spawn(async move {
log::info!("starting Peerstore GC...");
loop {
let either = future::select(rx.next(), task::sleep(PEERSTORE_GC_PURGE_INTERVAL).boxed()).await;
match either {
Either::Left((_, _)) => break,
Either::Right((_, _)) => peer_store.remove_expired_addrs(),
}
}
log::info!("quitting Peerstore GC...");
});
task::spawn(async move {
log::info!("starting Swarm main loop...");
let r = swarm.handle_messages().await;
log::info!("quitting Swarm main loop, due to {:?}...", r);
tx.close_channel();
if let Err(e) = swarm.peer_store.save_data() {
log::info!("PeerStore save data failed: {}", e);
}
log::info!("Swarm main loop exited");
});
}
fn get_connection_views(&self, peer_id: Option<PeerId>) -> Vec<ConnectionView> {
self.connections_by_id
.values()
.filter(|c| peer_id.as_ref().map_or(true, |pid| pid == &c.remote_peer()))
.map(|c| c.to_view())
.collect()
}
fn get_substream_views(&self, peer_id: PeerId) -> Result<Vec<SubstreamView>> {
let r = self.connections_by_peer.get(&peer_id);
if let Some(cids) = r {
let c = cids
.iter()
.filter_map(|cid| self.connections_by_id.get(cid).map(|c| c.substream_view()))
.flatten()
.collect();
Ok(c)
} else {
Err(SwarmError::NoConnection(peer_id))
}
}
fn get_network_info(&self) -> NetworkInfo {
let id = self.local_peer_id().clone();
let num_connections = self.connections_by_id.len();
let num_peers = self.connections_by_peer.len();
let num_active_streams = self.connections_by_id.iter().fold(0, |acc, (_k, v)| acc + v.num_streams());
NetworkInfo {
id,
num_peers,
num_connections,
num_connections_pending: 0,
num_connections_established: 0,
num_active_streams,
connection_info: vec![],
}
}
fn get_identify_info(&self) -> IdentifyInfo {
let protocols = self.muxer.supported_protocols().into_iter().map(|p| p.to_string()).collect();
let public_key = self.public_key.clone();
let listen_addrs = self.get_self_addrs();
IdentifyInfo {
public_key,
protocol_version: LIBP2P_RS_PROTOCOL_VERSION.to_string(),
agent_version: LIBP2P_RS_AGENT_VERSION.to_string(),
listen_addrs,
protocols,
}
}
pub fn listen_on(&mut self, addrs: Vec<Multiaddr>) -> Result<()> {
let mut succeeded: u32 = 0;
let mut errs = FnvHashMap::<u32, SwarmError>::default();
for (i, n) in addrs.clone().into_iter().enumerate() {
let r = self.add_listen_addr(n);
match r {
Ok(()) => succeeded += 1,
Err(e) => {
errs.insert(i as u32, e);
}
};
}
for (i, err) in errs.into_iter().enumerate() {
log::warn!("listen on {} failed: {:?}", addrs[i], err)
}
if succeeded == 0 && !addrs.is_empty() {
return Err(SwarmError::CanNotListenOnAny);
}
Ok(())
}
fn get_self_addrs(&self) -> Vec<Multiaddr> {
let mut listen_addrs = self.external_addrs.iter().cloned().collect::<Vec<_>>();
listen_addrs.extend(self.listened_addrs.to_vec());
listen_addrs.dedup();
log::trace!("swarm self addresses: {:?}", listen_addrs);
listen_addrs
}
fn add_listen_addr(&mut self, addr: Multiaddr) -> Result<()> {
log::info!("starting a listener on {:?}", addr);
let mut transport = self.transports.lookup_by_addr(addr.clone())?;
let mut listener = transport.listen_on(addr)?;
if let Some(addr) = listener.multi_addr() {
log::info!("adding an actual listening address {}", addr);
self.listened_addrs.push(addr.clone());
}
let mut tx = self.event_sender.clone();
task::spawn(async move {
loop {
let r = listener.accept().await;
match r {
Ok(ListenerEvent::AddressAdded(addr)) => {
let _ = tx.send(SwarmEvent::ListenAddressAdded(addr)).await;
}
Ok(ListenerEvent::AddressDeleted(addr)) => {
let _ = tx.send(SwarmEvent::ListenAddressDeleted(addr)).await;
}
Ok(ListenerEvent::Accepted(muxer)) => {
let _ = tx
.send(SwarmEvent::ConnectionEstablished {
stream_muxer: muxer,
direction: Direction::Inbound,
tid: None,
})
.await;
}
Err(err) => {
let _ = tx
.send(SwarmEvent::IncomingConnectionError {
remote_addr: Multiaddr::empty(),
error: err,
})
.await;
}
}
}
});
Ok(())
}
fn dial_addr<F: FnOnce(Result<&mut Connection>) + Send + 'static>(&mut self, peer_id: PeerId, addrs: Vec<Multiaddr>, f: F) {
log::debug!("dialing {:?} with addrs={:?}", peer_id, addrs);
if self.local_peer_id().eq(&peer_id) {
f(Err(SwarmError::DialToSelf));
return;
}
let tid = self.assign_tid();
self.dial_transactions.insert(tid, Box::new(f));
self.dialer.dial(
peer_id,
self.transports.clone(),
EitherDialAddr::Addresses(addrs),
self.event_sender.clone(),
tid,
);
}
fn dial_peer<F: FnOnce(Result<&mut Connection>) + Send + 'static>(&mut self, peer_id: PeerId, use_routing: bool, f: F) {
log::debug!("dialing {:?}, with routing={}", peer_id, use_routing);
if self.local_peer_id().eq(&peer_id) {
f(Err(SwarmError::DialToSelf));
return;
}
let addrs = match self.peer_store.get_addrs(&peer_id) {
Some(list) if !list.is_empty() => dial::EitherDialAddr::Addresses(list),
_ => {
if use_routing && self.routing.is_some() {
let routing = self.routing.as_ref().unwrap().box_clone();
dial::EitherDialAddr::DHT(routing)
} else {
f(Err(SwarmError::NoAddresses(peer_id)));
return;
}
}
};
let tid = self.assign_tid();
self.dial_transactions.insert(tid, Box::new(f));
self.dialer
.dial(peer_id, self.transports.clone(), addrs, self.event_sender.clone(), tid);
}
fn get_best_conn(&mut self, peer_id: &PeerId) -> Option<&mut connection::Connection> {
let mut best = None;
log::trace!("trying to get the best connnection for {:?}", peer_id);
if let Some(ids) = self.connections_by_peer.get(peer_id) {
let mut len = 0;
for id in ids.iter() {
if let Some(connection) = self.connections_by_id.get(id) {
let num = connection.num_streams();
if num >= len {
len = num;
best = Some(id);
}
}
}
}
if let Some(id) = best {
self.connections_by_id.get_mut(id)
} else {
None
}
}
fn is_connected(&self, peer_id: &PeerId) -> bool {
self.connections_by_peer.get(peer_id).map_or(0, |v| v.len()) > 0
}
pub fn local_pubkey(&self) -> &PublicKey {
&self.public_key
}
pub fn local_peer_id(&self) -> &PeerId {
&self.local_peer_id
}
pub fn ban_peer_id(&mut self, peer_id: PeerId) {
self.banned_peers.insert(peer_id);
}
pub fn unban_peer_id(&mut self, peer_id: PeerId) {
self.banned_peers.remove(peer_id.as_ref());
}
fn add_connection(&mut self, connection: Connection) {
let cid = connection.id();
let remote_peer_id = connection.remote_peer();
self.connections_by_id.insert(cid, connection);
let conns = self.connections_by_peer.entry(remote_peer_id).or_default();
conns.push(cid);
log::trace!("connection added to hashmap, total={}", self.connections_by_id.len());
}
fn handle_connection_opened(&mut self, stream_muxer: IStreamMuxer, dir: Direction, tid: Option<TransactionId>) -> Result<()> {
log::debug!("handle_connection_opened: {:?} {:?}", stream_muxer, dir);
let metric = self.metric.clone();
let mut connection = Connection::new(
self.assign_cid(),
stream_muxer.clone(),
dir,
self.event_sender.clone(),
self.ctrl_sender.clone(),
metric.clone(),
);
let mut tx = self.event_sender.clone();
let cid = connection.id();
let mut muxer = self.muxer.clone();
let ctrl = self.ctrl_sender.clone();
let handle = task::spawn(async move {
let mut stream_muxer = stream_muxer;
let task_handle = stream_muxer.task().map(task::spawn);
loop {
let metric = metric.clone();
let ctrl = ctrl.clone();
let r = stream_muxer.accept_stream().await;
match r {
Ok(raw_stream) => {
log::debug!("run protocol selection for inbound stream={:?}", raw_stream);
let result = muxer.select_inbound(raw_stream).await;
match result {
Ok((mut handler, raw_stream, proto)) => {
let la = stream_muxer.local_multiaddr();
let ra = stream_muxer.remote_multiaddr();
let rpid = stream_muxer.remote_peer();
let ci = ConnectInfo { la, ra, rpid };
let stream = Substream::new(raw_stream, metric, Direction::Inbound, proto.clone(), cid, ci, ctrl);
let view = stream.to_view();
let _ = tx.send(SwarmEvent::StreamOpened { view }).await;
task::spawn(async move {
let _ = handler.handle(stream, proto).await;
});
}
Err(error) => {
log::debug!("failed inbound protocol selection {:?} {:?}", cid, error);
let _ = tx.send(SwarmEvent::StreamError { cid, error }).await;
}
}
}
Err(error) => {
log::debug!("connection closed {:?} {:?}", cid, error);
let _ = tx.send(SwarmEvent::ConnectionClosed { cid, error }).await;
break;
}
}
}
if let Some(h) = task_handle {
h.await;
}
log::debug!("{:?} accept-task exiting...", stream_muxer);
});
connection.set_handle(handle);
for handler in self.muxer.protocol_handlers.values_mut() {
handler.connected(&mut connection);
}
if let Some(id) = tid {
log::debug!("invoking dial transaction {:?}", tid);
let callback = self.dial_transactions.remove(&id).expect("no match tid found");
callback(Ok(&mut connection));
}
self.add_connection(connection);
Ok(())
}
fn handle_connection_error(&mut self, peer_id: PeerId, error: SwarmError, tid: TransactionId) -> Result<()> {
log::debug!("handle_connection_error: {:?} {:?} tid={:?}", peer_id, error, tid);
let callback = self.dial_transactions.remove(&tid).expect("no match tid found");
callback(Err(error));
Ok(())
}
fn handle_stream_opened(&mut self, view: SubstreamView) -> Result<()> {
log::debug!("handle_stream_opened: {:?}", view);
if let Some(c) = self.connections_by_id.get_mut(&view.cid) {
c.add_stream(view)
};
Ok(())
}
fn handle_stream_closed(&mut self, cid: ConnectionId, sid: StreamId) -> Result<()> {
log::debug!("handle_stream_closed: {:?}/{:?}", cid, sid);
if let Some(c) = self.connections_by_id.get_mut(&cid) {
c.del_stream(sid)
}
Ok(())
}
fn handle_connection_closed(&mut self, cid: ConnectionId) -> Result<()> {
log::debug!("handle_connection_closed: {:?}", cid);
if let Some(mut connection) = self.connections_by_id.remove(&cid) {
for handler in self.muxer.protocol_handlers.values_mut() {
handler.disconnected(&mut connection);
}
let remote_peer_id = connection.remote_peer();
if let Some(ids) = self.connections_by_peer.get_mut(&remote_peer_id) {
ids.retain(|id| id != &cid);
if ids.is_empty() {
self.connections_by_peer.remove(&remote_peer_id);
}
} else {
log::warn!("shouldn't happen, PeerId={:?}", remote_peer_id);
}
task::spawn(async move {
let _ = connection.wait().await;
let _ = connection.stop_ping().await;
let _ = connection.stop_identify().await;
let _ = connection.stop_identify_push().await;
});
} else {
log::info!("shouldn't happen, wired connection {:?}", cid);
}
Ok(())
}
fn handle_ping_result(&mut self, cid: ConnectionId, result: Result<Duration>) -> Result<()> {
log::trace!("handle_ping_result: {:?} {:?}", cid, result);
if let Some(connection) = self.connections_by_id.get_mut(&cid) {
match result {
Ok(ttl) => {
log::trace!("ping TTL={:?} for {:?}", ttl, connection);
let peer_id = connection.stream_muxer().remote_peer();
self.peer_store.update_addr(&peer_id, ttl);
}
Err(_) => {
log::info!("reach the max ping failure count, closing {:?}", connection);
connection.close();
}
}
}
Ok(())
}
fn handle_observed_address(&mut self, observed_addr: Multiaddr, cid: ConnectionId) {
log::debug!("identify observed_addr: {} cid={:?}", observed_addr, cid);
let addrs = self.address_translation(&observed_addr).collect::<Vec<_>>();
for addr in addrs {
self.external_addrs.add(addr);
}
log::debug!("external address: {:?}", self.external_addrs)
}
fn handle_identify_result(&mut self, cid: ConnectionId, result: Result<(IdentifyInfo, Multiaddr)>) -> Result<()> {
log::debug!("handle_identify_result: {:?}", cid);
if let Some(connection) = self.connections_by_id.get_mut(&cid) {
match result {
Ok((info, observed_addr)) => {
let remote_pubkey = connection.remote_pub_key();
let peer_id = connection.remote_peer();
self.handle_observed_address(observed_addr, cid);
log::debug!(
"identified peer addresses {:?} protocols {:?} for {}",
info.listen_addrs,
info.protocols,
peer_id
);
self.peer_store.add_addrs(&peer_id, info.listen_addrs, ADDRESS_TTL);
self.peer_store.add_key(&peer_id, remote_pubkey);
self.peer_store.add_protocols(&peer_id, info.protocols);
for handler in self.muxer.protocol_handlers.values_mut() {
handler.identified(peer_id.clone());
}
}
Err(err) => {
log::info!("identify failed {:?} for {:?}", err, connection);
}
}
}
Ok(())
}
fn address_translation<'a>(&'a self, observed_addr: &'a Multiaddr) -> impl Iterator<Item = Multiaddr> + 'a {
let mut addrs: Vec<_> = self
.listened_addrs
.iter()
.filter_map(move |server| address_translation(server, observed_addr))
.collect();
addrs.sort_unstable();
addrs.dedup();
addrs.into_iter()
}
}
#[derive(Debug)]
pub enum SwarmError {
ConnectionLimit(ConnectionLimit),
NoAddresses(PeerId),
NoConnection(PeerId),
InvalidPeerId(PeerId),
Closing(u32),
Transport(TransportError),
Internal,
CanNotListenOnAny,
DialToSelf,
DialBackoff,
AllDialsFailed,
DialTimeout(Multiaddr, u64),
MaxDialAttempts(u32),
ConcurrentDialLimit(u32),
}
#[rustfmt::skip]
impl fmt::Display for SwarmError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SwarmError::ConnectionLimit(err) => write!(f, "Swarm Dial error: {}", err),
SwarmError::NoAddresses(peer_id) => write!(f, "Swarm Dial error: no addresses for peer{:?}.", peer_id),
SwarmError::NoConnection(peer_id) => write!(f, "Swarm Stream error: no connections for peer{:?}.", peer_id),
SwarmError::InvalidPeerId(peer_id) => write!(f, "Swarm Dial error: invalid peer id{:?}.", peer_id),
SwarmError::Transport(err) => write!(f, "Swarm Transport error: {}.", err),
SwarmError::Internal => write!(f, "Swarm internal error."),
SwarmError::Closing(s) => write!(f, "Swarm channel closed source={}.", s),
SwarmError::CanNotListenOnAny => write!(f, "Failed to listen on any addresses"),
SwarmError::DialToSelf => write!(f, "Swarm Dial error:dial to self attempted"),
SwarmError::DialBackoff => write!(f, "Swarm Dial error:dial backoff"),
SwarmError::AllDialsFailed => write!(f, "Swarm Dial error:all dials failed"),
SwarmError::DialTimeout(ma, t) => write!(f, "Swarm Dial error:dial timeout, addr={:?},timeout={:?}", ma, Duration::from_secs(*t)),
SwarmError::MaxDialAttempts(c) => write!(f, "Swarm Dial error:max dial attempts exceeded, count={}", c),
SwarmError::ConcurrentDialLimit(c) => write!(f, "Swarm Dial error:max concurrent dial exceeded, count={}", c),
}
}
}
impl error::Error for SwarmError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
SwarmError::ConnectionLimit(err) => Some(err),
SwarmError::NoAddresses(_) => None,
SwarmError::NoConnection(_) => None,
SwarmError::InvalidPeerId(_) => None,
SwarmError::Transport(err) => Some(err),
SwarmError::Internal => None,
SwarmError::Closing(_) => None,
SwarmError::CanNotListenOnAny => None,
SwarmError::DialToSelf => None,
SwarmError::DialBackoff => None,
SwarmError::AllDialsFailed => None,
SwarmError::DialTimeout(_, _) => None,
SwarmError::MaxDialAttempts(_) => None,
SwarmError::ConcurrentDialLimit(_) => None,
}
}
}
impl From<std::io::Error> for SwarmError {
fn from(err: std::io::Error) -> Self {
SwarmError::Transport(TransportError::IoError(err))
}
}
impl From<TransportError> for SwarmError {
fn from(err: TransportError) -> Self {
SwarmError::Transport(err)
}
}
impl From<mpsc::SendError> for SwarmError {
fn from(_: mpsc::SendError) -> Self {
SwarmError::Internal
}
}
impl From<oneshot::Canceled> for SwarmError {
fn from(_: oneshot::Canceled) -> Self {
SwarmError::Internal
}
}