use futures::{
channel::{mpsc, oneshot},
prelude::*,
};
use libp2prs_core::peerstore::PeerStore;
use libp2prs_core::{Multiaddr, PeerId, ProtocolId, PublicKey};
use std::sync::Arc;
use std::time::Duration;
use crate::connection::{ConnectionId, ConnectionView};
use crate::identify::IdentifyInfo;
use crate::metrics::metric::Metric;
use crate::network::NetworkInfo;
use crate::substream::{StreamId, Substream, SubstreamView};
use crate::SwarmError;
use std::collections::hash_map::IntoIter;
type Result<T> = std::result::Result<T, SwarmError>;
#[derive(Debug)]
pub enum SwarmControlCmd {
Connect(PeerId, Vec<Multiaddr>, oneshot::Sender<Result<()>>),
NewConnection(PeerId, bool, oneshot::Sender<Result<()>>),
CloseConnection(PeerId, oneshot::Sender<Result<()>>),
NewStream(PeerId, Vec<ProtocolId>, bool, oneshot::Sender<Result<Substream>>),
CloseStream(ConnectionId, StreamId),
SelfAddresses(oneshot::Sender<Vec<Multiaddr>>),
NetworkInfo(oneshot::Sender<NetworkInfo>),
IdentifyInfo(oneshot::Sender<IdentifyInfo>),
Dump(DumpCommand),
}
#[derive(Debug)]
pub enum DumpCommand {
Connections(Option<PeerId>, oneshot::Sender<Vec<ConnectionView>>),
Streams(PeerId, oneshot::Sender<Result<Vec<SubstreamView>>>),
}
#[derive(Clone)]
pub struct Control {
sender: mpsc::Sender<SwarmControlCmd>,
peer_store: PeerStore,
metric: Arc<Metric>,
}
#[allow(dead_code)]
impl Control {
pub(crate) fn new(sender: mpsc::Sender<SwarmControlCmd>, peer_store: PeerStore, metric: Arc<Metric>) -> Self {
Control {
sender,
peer_store,
metric,
}
}
pub fn peer_in_iter(&self) -> IntoIter<PeerId, usize> {
self.metric.get_peers_in_list()
}
pub fn peer_out_iter(&self) -> IntoIter<PeerId, usize> {
self.metric.get_peers_out_list()
}
pub fn protocol_in_iter(&self) -> IntoIter<String, usize> {
self.metric.get_protocols_in_list()
}
pub fn protocol_out_iter(&self) -> IntoIter<String, usize> {
self.metric.get_protocols_out_list()
}
pub fn get_peers(&self) -> Vec<PeerId> {
self.peer_store.get_peers()
}
pub fn get_recv_count_and_size(&self) -> (usize, usize) {
self.metric.get_recv_count_and_size()
}
pub fn get_sent_count_and_size(&self) -> (usize, usize) {
self.metric.get_sent_count_and_size()
}
pub fn get_protocol_in_and_out(&self, protocol_id: &str) -> (Option<usize>, Option<usize>) {
self.metric.get_protocol_in_and_out(protocol_id)
}
pub fn get_peer_in_and_out(&self, peer_id: &PeerId) -> (Option<usize>, Option<usize>) {
self.metric.get_peer_in_and_out(peer_id)
}
pub async fn connect_with_addrs(&mut self, peer_id: PeerId, addrs: Vec<Multiaddr>) -> Result<()> {
let (tx, rx) = oneshot::channel::<Result<()>>();
self.sender.send(SwarmControlCmd::Connect(peer_id, addrs, tx)).await?;
rx.await?
}
pub async fn new_connection(&mut self, peer_id: PeerId) -> Result<()> {
let (tx, rx) = oneshot::channel::<Result<()>>();
self.sender.send(SwarmControlCmd::NewConnection(peer_id, true, tx)).await?;
rx.await?
}
pub async fn new_connection_no_routing(&mut self, peer_id: PeerId) -> Result<()> {
let (tx, rx) = oneshot::channel::<Result<()>>();
self.sender.send(SwarmControlCmd::NewConnection(peer_id, false, tx)).await?;
rx.await?
}
pub async fn disconnect(&mut self, peer_id: PeerId) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.sender.send(SwarmControlCmd::CloseConnection(peer_id, tx)).await?;
rx.await?
}
pub async fn new_stream(&mut self, peer_id: PeerId, pids: Vec<ProtocolId>) -> Result<Substream> {
let (tx, rx) = oneshot::channel();
self.sender.send(SwarmControlCmd::NewStream(peer_id, pids, true, tx)).await?;
rx.await?
}
pub async fn new_stream_no_routing(&mut self, peer_id: PeerId, pids: Vec<ProtocolId>) -> Result<Substream> {
let (tx, rx) = oneshot::channel();
self.sender.send(SwarmControlCmd::NewStream(peer_id, pids, false, tx)).await?;
rx.await?
}
pub async fn self_addrs(&mut self) -> Result<Vec<Multiaddr>> {
let (tx, rx) = oneshot::channel();
self.sender.send(SwarmControlCmd::SelfAddresses(tx)).await?;
Ok(rx.await?)
}
pub async fn retrieve_networkinfo(&mut self) -> Result<NetworkInfo> {
let (tx, rx) = oneshot::channel();
self.sender.send(SwarmControlCmd::NetworkInfo(tx)).await?;
Ok(rx.await?)
}
pub async fn retrieve_identify_info(&mut self) -> Result<IdentifyInfo> {
let (tx, rx) = oneshot::channel();
self.sender.send(SwarmControlCmd::IdentifyInfo(tx)).await?;
Ok(rx.await?)
}
pub async fn dump_connections(&mut self, peer_id: Option<PeerId>) -> Result<Vec<ConnectionView>> {
let (tx, rx) = oneshot::channel();
self.sender
.send(SwarmControlCmd::Dump(DumpCommand::Connections(peer_id, tx)))
.await?;
Ok(rx.await?)
}
pub async fn dump_streams(&mut self, peer_id: PeerId) -> Result<Vec<SubstreamView>> {
let (tx, rx) = oneshot::channel();
self.sender.send(SwarmControlCmd::Dump(DumpCommand::Streams(peer_id, tx))).await?;
rx.await?
}
pub async fn close(&mut self) {
self.sender.close_channel();
}
pub fn pin(&self, peer_id: &PeerId) {
self.peer_store.pin(peer_id)
}
pub fn unpin(&self, peer_id: &PeerId) {
self.peer_store.unpin(peer_id);
}
pub fn pinned(&self, peer_id: &PeerId) -> bool {
self.peer_store.pinned(peer_id)
}
pub fn get_key(&self, peer_id: &PeerId) -> Option<PublicKey> {
self.peer_store.get_key(peer_id)
}
pub fn get_addrs(&self, peer_id: &PeerId) -> Option<Vec<Multiaddr>> {
self.peer_store.get_addrs(peer_id)
}
pub fn add_addr(&self, peer_id: &PeerId, addr: Multiaddr, ttl: Duration) {
self.peer_store.add_addr(peer_id, addr, ttl)
}
pub fn add_addrs(&self, peer_id: &PeerId, addrs: Vec<Multiaddr>, ttl: Duration) {
self.peer_store.add_addrs(peer_id, addrs, ttl)
}
pub fn clear_addrs(&self, peer_id: &PeerId) {
self.peer_store.clear_addrs(peer_id)
}
pub fn update_addr(&self, peer_id: &PeerId, new_ttl: Duration) {
self.peer_store.update_addr(peer_id, new_ttl)
}
pub fn add_protocols(&self, peer_id: &PeerId, protos: Vec<String>) {
self.peer_store.add_protocols(peer_id, protos);
}
pub fn clear_protocols(&self, peer_id: &PeerId) {
self.peer_store.clear_protocols(peer_id);
}
pub fn get_protocols(&self, peer_id: &PeerId) -> Option<Vec<String>> {
self.peer_store.get_protocols(peer_id)
}
pub fn first_supported_protocol(&self, peer_id: &PeerId, protos: Vec<String>) -> Option<String> {
self.peer_store.first_supported_protocol(peer_id, protos)
}
fn support_protocols(&self, peer_id: &PeerId, protos: Vec<String>) -> Option<Vec<String>> {
self.peer_store.support_protocols(peer_id, protos)
}
}