use smallvec::SmallVec;
use std::hash::Hash;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::{error::Error, fmt};
use futures::channel::{mpsc, oneshot};
use futures::prelude::*;
use async_std::task;
use async_std::task::JoinHandle;
use libp2prs_core::identity::Keypair;
use libp2prs_core::multistream::Negotiator;
use libp2prs_core::muxing::IStreamMuxer;
use libp2prs_core::transport::TransportError;
use libp2prs_core::PublicKey;
use crate::connection::Direction::Outbound;
use crate::control::SwarmControlCmd;
use crate::identify::{IDENTIFY_PROTOCOL, IDENTIFY_PUSH_PROTOCOL};
use crate::metrics::metric::Metric;
use crate::ping::PING_PROTOCOL;
use crate::substream::{ConnectInfo, StreamId, Substream, SubstreamView};
use crate::{identify, ping, Multiaddr, PeerId, ProtocolId, SwarmError, SwarmEvent};
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Direction {
Outbound,
Inbound,
}
impl fmt::Display for Direction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", if self == &Outbound { "Out" } else { "In " })
}
}
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ConnectionId(usize);
impl fmt::Display for ConnectionId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:<5}", self.0)
}
}
#[allow(dead_code)]
pub struct Connection {
id: ConnectionId,
stream_muxer: IStreamMuxer,
tx: mpsc::UnboundedSender<SwarmEvent>,
ctrl: mpsc::Sender<SwarmControlCmd>,
substreams: SmallVec<[SubstreamView; 8]>,
dir: Direction,
ping_running: Arc<AtomicBool>,
ping_failures: u32,
identity: Option<()>,
handle: Option<JoinHandle<()>>,
ping_handle: Option<JoinHandle<()>>,
identify_handle: Option<JoinHandle<()>>,
identify_push_handle: Option<JoinHandle<()>>,
metric: Arc<Metric>,
}
impl PartialEq for Connection {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl fmt::Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection")
.field("id", &self.id)
.field("muxer", &self.stream_muxer)
.field("dir", &self.dir)
.field("subs", &self.substreams)
.finish()
}
}
#[allow(dead_code)]
impl Connection {
pub(crate) fn new(
id: usize,
stream_muxer: IStreamMuxer,
dir: Direction,
tx: mpsc::UnboundedSender<SwarmEvent>,
ctrl: mpsc::Sender<SwarmControlCmd>,
metric: Arc<Metric>,
) -> Self {
Connection {
id: ConnectionId(id),
stream_muxer,
tx,
ctrl,
dir,
substreams: Default::default(),
handle: None,
ping_running: Arc::new(AtomicBool::new(false)),
ping_failures: 0,
ping_handle: None,
identity: None,
identify_handle: None,
identify_push_handle: None,
metric,
}
}
pub(crate) fn to_view(&self) -> ConnectionView {
ConnectionView {
id: self.id,
dir: self.dir,
info: self.info(),
substreams: self.substreams.clone(),
}
}
pub(crate) fn substream_view(&self) -> Vec<SubstreamView> {
self.substreams.to_vec()
}
pub(crate) fn id(&self) -> ConnectionId {
self.id
}
pub(crate) fn stream_muxer(&self) -> &IStreamMuxer {
&self.stream_muxer
}
pub(crate) fn set_handle(&mut self, handle: JoinHandle<()>) {
self.handle = Some(handle);
}
pub fn open_stream<T: Send + 'static>(
&mut self,
pids: Vec<ProtocolId>,
f: impl FnOnce(Result<Substream, TransportError>) -> T + Send + 'static,
) -> JoinHandle<T> {
let cid = self.id();
let stream_muxer = self.stream_muxer().clone();
let mut tx = self.tx.clone();
let ctrl = self.ctrl.clone();
let metric = self.metric.clone();
task::spawn(async move {
let result = open_stream_internal(cid, stream_muxer, pids, ctrl, metric).await;
let event = match result.as_ref() {
Ok(sub_stream) => {
let view = sub_stream.to_view();
SwarmEvent::StreamOpened { view }
}
Err(_) => SwarmEvent::StreamError {
cid,
error: TransportError::Internal,
},
};
let _ = tx.send(event).await;
f(result)
})
}
pub fn close(&self) {
log::debug!("closing {:?}", self);
let mut stream_muxer = self.stream_muxer.clone();
task::spawn(async move {
let _ = stream_muxer.close().await;
});
}
pub(crate) async fn wait(&mut self) -> Result<(), SwarmError> {
if let Some(h) = self.handle.take() {
h.await;
}
Ok(())
}
pub(crate) fn local_addr(&self) -> Multiaddr {
self.stream_muxer.local_multiaddr()
}
pub(crate) fn remote_addr(&self) -> Multiaddr {
self.stream_muxer.remote_multiaddr()
}
pub(crate) fn local_peer(&self) -> PeerId {
self.stream_muxer.local_peer()
}
pub fn remote_peer(&self) -> PeerId {
self.stream_muxer.remote_peer()
}
pub(crate) fn local_priv_key(&self) -> Keypair {
self.stream_muxer.local_priv_key()
}
pub(crate) fn remote_pub_key(&self) -> PublicKey {
self.stream_muxer.remote_pub_key()
}
pub(crate) fn add_stream(&mut self, view: SubstreamView) {
log::debug!("adding sub {:?} to connection", view);
self.substreams.push(view);
}
pub(crate) fn del_stream(&mut self, sid: StreamId) {
log::debug!("removing sub {:?} from connection", sid);
self.substreams.retain(|s| s.id != sid);
}
pub(crate) fn num_streams(&self) -> usize {
self.substreams.len()
}
pub(crate) fn start_ping(&mut self, timeout: Duration, interval: Duration, max_failures: u32) {
self.ping_running.store(true, Ordering::Relaxed);
let cid = self.id();
let stream_muxer = self.stream_muxer.clone();
let mut tx = self.tx.clone();
let flag = self.ping_running.clone();
let pids = vec![PING_PROTOCOL.into()];
let ctrl = self.ctrl.clone();
let metric = self.metric.clone();
let handle = task::spawn(async move {
let mut fail_cnt: u32 = 0;
loop {
if !flag.load(Ordering::Relaxed) {
break;
}
task::sleep(interval).await;
if !flag.load(Ordering::Relaxed) {
break;
}
let stream_muxer = stream_muxer.clone();
let pids = pids.clone();
let ctrl2 = ctrl.clone();
let r = open_stream_internal(cid, stream_muxer, pids, ctrl2, metric.clone()).await;
let r = match r {
Ok(stream) => {
let view = stream.to_view();
let _ = tx.send(SwarmEvent::StreamOpened { view }).await;
let res = ping::ping(stream, timeout).await;
if res.is_ok() {
fail_cnt = 0;
} else {
fail_cnt += 1;
}
res
}
Err(err) => {
log::info!("Ping protocol not supported: {:?}", err);
Err(err)
}
};
if fail_cnt >= max_failures {
let _ = tx
.send(SwarmEvent::PingResult {
cid,
result: r.map_err(|e| e.into()),
})
.await;
break;
}
}
log::debug!("ping task exiting...");
});
self.ping_handle = Some(handle);
}
pub(crate) async fn stop_ping(&mut self) {
if let Some(h) = self.ping_handle.take() {
log::debug!("stopping Ping service for {:?}...", self.id);
self.ping_running.store(false, Ordering::Relaxed);
h.await;
}
}
pub(crate) fn start_identify(&mut self) {
let cid = self.id();
let stream_muxer = self.stream_muxer.clone();
let mut tx = self.tx.clone();
let ctrl = self.ctrl.clone();
let pids = vec![IDENTIFY_PROTOCOL.into()];
let metric = self.metric.clone();
let handle = task::spawn(async move {
let r = open_stream_internal(cid, stream_muxer, pids, ctrl, metric).await;
let r = match r {
Ok(stream) => {
let view = stream.to_view();
let _ = tx.send(SwarmEvent::StreamOpened { view }).await;
identify::process_message(stream).await
}
Err(err) => {
log::info!("Identify protocol not supported: {:?}", err);
Err(err)
}
};
let _ = tx
.send(SwarmEvent::IdentifyResult {
cid,
result: r.map_err(TransportError::into),
})
.await;
log::debug!("identify task exiting...");
});
self.identify_handle = Some(handle);
}
pub(crate) async fn stop_identify(&mut self) {
if let Some(h) = self.identify_handle.take() {
log::debug!("stopping Identify service for {:?}...", self.id);
h.cancel().await;
}
}
pub(crate) fn start_identify_push(&mut self) {
let cid = self.id();
let stream_muxer = self.stream_muxer.clone();
let pids = vec![IDENTIFY_PUSH_PROTOCOL.into()];
let metric = self.metric.clone();
let mut ctrl = self.ctrl.clone();
let mut tx = self.tx.clone();
let handle = task::spawn(async move {
let (swrm_tx, swrm_rx) = oneshot::channel();
if ctrl.send(SwarmControlCmd::IdentifyInfo(swrm_tx)).await.is_err() {
return;
}
let info = swrm_rx.await.expect("get identify info");
let r = open_stream_internal(cid, stream_muxer, pids, ctrl, metric).await;
match r {
Ok(stream) => {
let view = stream.to_view();
let _ = tx.send(SwarmEvent::StreamOpened { view }).await;
let _ = identify::produce_message(stream, info).await;
}
Err(err) => {
log::info!("Identify push protocol not supported: {:?}", err);
}
}
log::debug!("identify push task exiting...");
});
self.identify_push_handle = Some(handle);
}
pub(crate) async fn stop_identify_push(&mut self) {
if let Some(h) = self.identify_push_handle.take() {
log::debug!("stopping Identify Push service for {:?}...", self.id);
h.cancel().await;
}
}
pub(crate) fn info(&self) -> ConnectionInfo {
let num_inbound_streams = self.substreams.iter().fold(0usize, |mut acc, s| {
if s.dir == Direction::Inbound {
acc += 1;
}
acc
});
let num_outbound_streams = self.substreams.len() - num_inbound_streams;
ConnectionInfo {
la: self.local_addr(),
ra: self.remote_addr(),
local_peer_id: self.local_peer(),
remote_peer_id: self.remote_peer(),
num_inbound_streams,
num_outbound_streams,
}
}
}
#[derive(Debug)]
pub struct ConnectionView {
pub id: ConnectionId,
pub dir: Direction,
pub info: ConnectionInfo,
pub substreams: SmallVec<[SubstreamView; 8]>,
}
impl fmt::Display for ConnectionView {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{} {} RPID({:52}) I/O({}{}) RA({})",
self.id, self.dir, self.info.remote_peer_id, self.info.num_inbound_streams, self.info.num_outbound_streams, self.info.ra
)
}
}
async fn open_stream_internal(
cid: ConnectionId,
mut stream_muxer: IStreamMuxer,
pids: Vec<ProtocolId>,
ctrl: mpsc::Sender<SwarmControlCmd>,
metric: Arc<Metric>,
) -> Result<Substream, TransportError> {
log::debug!("opening substream on {:?} {:?}", cid, pids);
let raw_stream = stream_muxer.open_stream().await?;
let la = stream_muxer.local_multiaddr();
let ra = stream_muxer.remote_multiaddr();
let rpid = stream_muxer.remote_peer();
let negotiator = Negotiator::new_with_protocols(pids);
let result = negotiator.select_one(raw_stream).await;
match result {
Ok((proto, raw_stream)) => {
log::debug!("selected outbound {:?} {:?}", cid, proto);
let ci = ConnectInfo { la, ra, rpid };
let stream = Substream::new(raw_stream, metric.clone(), Direction::Outbound, proto, cid, ci, ctrl);
Ok(stream)
}
Err(err) => {
log::debug!("failed outbound protocol selection {:?} {:?}", cid, err);
Err(TransportError::NegotiationError(err))
}
}
}
#[derive(Debug, Clone)]
pub struct ConnectionLimit {
pub limit: usize,
pub current: usize,
}
impl fmt::Display for ConnectionLimit {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", self.current, self.limit)
}
}
impl Error for ConnectionLimit {}
#[derive(Debug)]
pub struct ConnectionInfo {
pub la: Multiaddr,
pub ra: Multiaddr,
pub local_peer_id: PeerId,
pub remote_peer_id: PeerId,
pub num_inbound_streams: usize,
pub num_outbound_streams: usize,
}