use crate::{
gossipsub::{
config::default_gossipsub_config,
topics::{
CON_VOTE_GOSSIP_TOPIC,
NEW_BLOCK_GOSSIP_TOPIC,
NEW_TX_GOSSIP_TOPIC,
},
},
heartbeat::HeartbeatConfig,
peer_manager::ConnectionState,
};
use fuel_core_types::blockchain::consensus::Genesis;
use libp2p::{
core::{
muxing::StreamMuxerBox,
transport::Boxed,
},
gossipsub::GossipsubConfig,
identity::{
secp256k1::SecretKey,
Keypair,
},
mplex,
noise::{self,},
tcp::{
tokio::Transport as TokioTcpTransport,
Config as TcpConfig,
},
yamux,
Multiaddr,
PeerId,
Transport,
};
use std::{
collections::HashSet,
net::{
IpAddr,
Ipv4Addr,
},
sync::{
Arc,
RwLock,
},
time::Duration,
};
use self::{
connection_tracker::ConnectionTracker,
fuel_authenticated::FuelAuthenticated,
fuel_upgrade::{
Checksum,
FuelUpgrade,
},
guarded_node::GuardedNode,
};
mod connection_tracker;
mod fuel_authenticated;
mod fuel_upgrade;
mod guarded_node;
const REQ_RES_TIMEOUT: Duration = Duration::from_secs(20);
pub const MAX_RESPONSE_SIZE: usize = 18 * 1024 * 1024;
const TRANSPORT_TIMEOUT: Duration = Duration::from_secs(20);
#[derive(Clone, Debug)]
pub struct Config<State = Initialized> {
pub keypair: Keypair,
pub network_name: String,
pub checksum: Checksum,
pub address: IpAddr,
pub public_address: Option<Multiaddr>,
pub tcp_port: u16,
pub max_block_size: usize,
pub bootstrap_nodes: Vec<Multiaddr>,
pub enable_mdns: bool,
pub allow_private_addresses: bool,
pub random_walk: Option<Duration>,
pub connection_idle_timeout: Option<Duration>,
pub reserved_nodes: Vec<Multiaddr>,
pub reserved_nodes_only_mode: bool,
pub max_peers_connected: u32,
pub max_connections_per_peer: u32,
pub identify_interval: Option<Duration>,
pub info_interval: Option<Duration>,
pub gossipsub_config: GossipsubConfig,
pub topics: Vec<String>,
pub heartbeat_config: HeartbeatConfig,
pub set_request_timeout: Duration,
pub set_connection_keep_alive: Duration,
pub metrics: bool,
pub state: State,
}
#[derive(Clone, Debug)]
pub struct Initialized(());
#[derive(Clone, Debug)]
pub struct NotInitialized;
impl Config<NotInitialized> {
pub fn init(self, genesis: Genesis) -> anyhow::Result<Config<Initialized>> {
use fuel_core_chain_config::GenesisCommitment;
Ok(Config {
keypair: self.keypair,
network_name: self.network_name,
checksum: genesis.root()?.into(),
address: self.address,
public_address: self.public_address,
tcp_port: self.tcp_port,
max_block_size: self.max_block_size,
bootstrap_nodes: self.bootstrap_nodes,
enable_mdns: self.enable_mdns,
max_peers_connected: self.max_peers_connected,
max_connections_per_peer: self.max_connections_per_peer,
allow_private_addresses: self.allow_private_addresses,
random_walk: self.random_walk,
connection_idle_timeout: self.connection_idle_timeout,
reserved_nodes: self.reserved_nodes,
reserved_nodes_only_mode: self.reserved_nodes_only_mode,
identify_interval: self.identify_interval,
info_interval: self.info_interval,
gossipsub_config: self.gossipsub_config,
topics: self.topics,
heartbeat_config: self.heartbeat_config,
set_request_timeout: self.set_request_timeout,
set_connection_keep_alive: self.set_connection_keep_alive,
metrics: self.metrics,
state: Initialized(()),
})
}
}
pub fn convert_to_libp2p_keypair(
secret_key_bytes: impl AsMut<[u8]>,
) -> anyhow::Result<Keypair> {
let secret_key = SecretKey::from_bytes(secret_key_bytes)?;
Ok(Keypair::Secp256k1(secret_key.into()))
}
impl Config<NotInitialized> {
pub fn default(network_name: &str) -> Self {
let keypair = Keypair::generate_secp256k1();
Self {
keypair,
network_name: network_name.into(),
checksum: Default::default(),
address: IpAddr::V4(Ipv4Addr::from([0, 0, 0, 0])),
public_address: None,
tcp_port: 0,
max_block_size: MAX_RESPONSE_SIZE,
bootstrap_nodes: vec![],
enable_mdns: false,
max_peers_connected: 50,
max_connections_per_peer: 3,
allow_private_addresses: true,
random_walk: Some(Duration::from_millis(500)),
connection_idle_timeout: Some(Duration::from_secs(120)),
reserved_nodes: vec![],
reserved_nodes_only_mode: false,
topics: vec![
NEW_TX_GOSSIP_TOPIC.into(),
NEW_BLOCK_GOSSIP_TOPIC.into(),
CON_VOTE_GOSSIP_TOPIC.into(),
],
gossipsub_config: default_gossipsub_config(),
heartbeat_config: HeartbeatConfig::default(),
set_request_timeout: REQ_RES_TIMEOUT,
set_connection_keep_alive: REQ_RES_TIMEOUT,
info_interval: Some(Duration::from_secs(3)),
identify_interval: Some(Duration::from_secs(5)),
metrics: false,
state: NotInitialized,
}
}
}
#[cfg(any(feature = "test-helpers", test))]
impl Config<Initialized> {
pub fn default_initialized(network_name: &str) -> Self {
Config::<NotInitialized>::default(network_name)
.init(Default::default())
.expect("Expected correct initialization of config")
}
}
pub(crate) fn build_transport(
p2p_config: &Config,
) -> (
Boxed<(PeerId, StreamMuxerBox)>,
Arc<RwLock<ConnectionState>>,
) {
let transport = {
let generate_tcp_transport =
|| TokioTcpTransport::new(TcpConfig::new().port_reuse(true).nodelay(true));
let tcp = generate_tcp_transport();
let ws_tcp =
libp2p::websocket::WsConfig::new(generate_tcp_transport()).or_transport(tcp);
libp2p::dns::TokioDnsConfig::system(ws_tcp).unwrap()
}
.upgrade(libp2p::core::upgrade::Version::V1);
let noise_authenticated = {
let dh_keys = noise::Keypair::<noise::X25519Spec>::new()
.into_authentic(&p2p_config.keypair)
.expect("Noise key generation failed");
noise::NoiseConfig::xx(dh_keys).into_authenticated()
};
let multiplex_config = {
let mplex_config = mplex::MplexConfig::default();
let mut yamux_config = yamux::YamuxConfig::default();
yamux_config.set_max_buffer_size(MAX_RESPONSE_SIZE);
libp2p::core::upgrade::SelectUpgrade::new(yamux_config, mplex_config)
};
let fuel_upgrade = FuelUpgrade::new(p2p_config.checksum);
let connection_state = ConnectionState::new();
let transport = if p2p_config.reserved_nodes_only_mode {
let guarded_node = GuardedNode::new(&p2p_config.reserved_nodes);
let fuel_authenticated =
FuelAuthenticated::new(noise_authenticated, guarded_node);
transport
.authenticate(fuel_authenticated)
.apply(fuel_upgrade)
.multiplex(multiplex_config)
.timeout(TRANSPORT_TIMEOUT)
.boxed()
} else {
let connection_tracker =
ConnectionTracker::new(&p2p_config.reserved_nodes, connection_state.clone());
let fuel_authenticated =
FuelAuthenticated::new(noise_authenticated, connection_tracker);
transport
.authenticate(fuel_authenticated)
.apply(fuel_upgrade)
.multiplex(multiplex_config)
.timeout(TRANSPORT_TIMEOUT)
.boxed()
};
(transport, connection_state)
}
fn peer_ids_set_from(multiaddr: &[Multiaddr]) -> HashSet<PeerId> {
multiaddr
.iter()
.map(|address| PeerId::try_from_multiaddr(address).unwrap())
.collect()
}