use crate::gossipsub::{
config::default_gossipsub_config,
topics::{
CON_VOTE_GOSSIP_TOPIC,
NEW_BLOCK_GOSSIP_TOPIC,
NEW_TX_GOSSIP_TOPIC,
},
};
use fuel_core_interfaces::{
common::secrecy::Zeroize,
model::Genesis,
};
use futures::{
future,
AsyncRead,
AsyncWrite,
Future,
FutureExt,
TryFutureExt,
};
use libp2p::{
core::{
muxing::StreamMuxerBox,
transport::Boxed,
upgrade::{
read_length_prefixed,
write_length_prefixed,
},
UpgradeInfo,
},
gossipsub::GossipsubConfig,
identity::{
secp256k1::SecretKey,
Keypair,
},
mplex,
noise::{
self,
NoiseAuthenticated,
NoiseError,
NoiseOutput,
Protocol,
},
tcp::{
tokio::Transport as TokioTcpTransport,
Config as TcpConfig,
},
yamux,
InboundUpgrade,
Multiaddr,
OutboundUpgrade,
PeerId,
Transport,
};
use std::{
collections::HashSet,
error::Error,
fmt,
io,
net::{
IpAddr,
Ipv4Addr,
},
pin::Pin,
time::Duration,
};
const REQ_RES_TIMEOUT: Duration = Duration::from_secs(20);
const MAX_NUM_OF_FRAMES_BUFFERED: usize = 256;
const TRANSPORT_TIMEOUT: Duration = Duration::from_secs(20);
#[derive(Debug, Clone, Copy, Default)]
pub struct Checksum([u8; 32]);
impl From<[u8; 32]> for Checksum {
fn from(value: [u8; 32]) -> Self {
Self(value)
}
}
#[derive(Clone, Debug)]
pub struct P2PConfig<State = Initialized> {
pub keypair: Keypair,
pub network_name: String,
pub checksum: Checksum,
pub address: IpAddr,
pub public_address: Option<Multiaddr>,
pub tcp_port: u16,
pub max_block_size: usize,
pub bootstrap_nodes: Vec<Multiaddr>,
pub enable_mdns: bool,
pub max_peers_connected: usize,
pub allow_private_addresses: bool,
pub random_walk: Option<Duration>,
pub connection_idle_timeout: Option<Duration>,
pub reserved_nodes: Vec<Multiaddr>,
pub reserved_nodes_only_mode: bool,
pub identify_interval: Option<Duration>,
pub info_interval: Option<Duration>,
pub gossipsub_config: GossipsubConfig,
pub topics: Vec<String>,
pub set_request_timeout: Duration,
pub set_connection_keep_alive: Duration,
pub metrics: bool,
pub state: State,
}
#[derive(Clone, Debug)]
pub struct Initialized(());
#[derive(Clone, Debug)]
pub struct NotInitialized;
impl P2PConfig<NotInitialized> {
pub fn init(self, mut genesis: Genesis) -> anyhow::Result<P2PConfig<Initialized>> {
use fuel_chain_config::GenesisCommitment;
Ok(P2PConfig {
keypair: self.keypair,
network_name: self.network_name,
checksum: genesis.root()?.into(),
address: self.address,
public_address: self.public_address,
tcp_port: self.tcp_port,
max_block_size: self.max_block_size,
bootstrap_nodes: self.bootstrap_nodes,
enable_mdns: self.enable_mdns,
max_peers_connected: self.max_peers_connected,
allow_private_addresses: self.allow_private_addresses,
random_walk: self.random_walk,
connection_idle_timeout: self.connection_idle_timeout,
reserved_nodes: self.reserved_nodes,
reserved_nodes_only_mode: self.reserved_nodes_only_mode,
identify_interval: self.identify_interval,
info_interval: self.info_interval,
gossipsub_config: self.gossipsub_config,
topics: self.topics,
set_request_timeout: self.set_request_timeout,
set_connection_keep_alive: self.set_connection_keep_alive,
metrics: self.metrics,
state: Initialized(()),
})
}
}
pub fn convert_to_libp2p_keypair(
secret_key_bytes: impl AsMut<[u8]>,
) -> anyhow::Result<Keypair> {
let secret_key = SecretKey::from_bytes(secret_key_bytes)?;
Ok(Keypair::Secp256k1(secret_key.into()))
}
impl P2PConfig<NotInitialized> {
pub fn default(network_name: &str) -> Self {
let keypair = Keypair::generate_secp256k1();
Self {
keypair,
network_name: network_name.into(),
checksum: Default::default(),
address: IpAddr::V4(Ipv4Addr::from([0, 0, 0, 0])),
public_address: None,
tcp_port: 0,
max_block_size: 100_000,
bootstrap_nodes: vec![],
enable_mdns: false,
max_peers_connected: 50,
allow_private_addresses: true,
random_walk: Some(Duration::from_secs(5)),
connection_idle_timeout: Some(Duration::from_secs(120)),
reserved_nodes: vec![],
reserved_nodes_only_mode: false,
topics: vec![
NEW_TX_GOSSIP_TOPIC.into(),
NEW_BLOCK_GOSSIP_TOPIC.into(),
CON_VOTE_GOSSIP_TOPIC.into(),
],
gossipsub_config: default_gossipsub_config(),
set_request_timeout: REQ_RES_TIMEOUT,
set_connection_keep_alive: REQ_RES_TIMEOUT,
info_interval: Some(Duration::from_secs(3)),
identify_interval: Some(Duration::from_secs(5)),
metrics: false,
state: NotInitialized,
}
}
}
#[cfg(any(feature = "test-helpers", test))]
impl P2PConfig<Initialized> {
pub fn default_initialized(network_name: &str) -> Self {
P2PConfig::<NotInitialized>::default(network_name)
.init(Default::default())
.expect("Expected correct initialization of config")
}
}
pub(crate) fn build_transport(p2p_config: &P2PConfig) -> Boxed<(PeerId, StreamMuxerBox)> {
let transport = {
let generate_tcp_transport =
|| TokioTcpTransport::new(TcpConfig::new().port_reuse(true).nodelay(true));
let tcp = generate_tcp_transport();
let ws_tcp =
libp2p::websocket::WsConfig::new(generate_tcp_transport()).or_transport(tcp);
libp2p::dns::TokioDnsConfig::system(ws_tcp).unwrap()
}
.upgrade(libp2p::core::upgrade::Version::V1);
let noise_authenticated = {
let dh_keys = noise::Keypair::<noise::X25519Spec>::new()
.into_authentic(&p2p_config.keypair)
.expect("Noise key generation failed");
noise::NoiseConfig::xx(dh_keys).into_authenticated()
};
let multiplex_config = {
let mut mplex_config = mplex::MplexConfig::new();
mplex_config.set_max_buffer_size(MAX_NUM_OF_FRAMES_BUFFERED);
let yamux_config = yamux::YamuxConfig::default();
libp2p::core::upgrade::SelectUpgrade::new(yamux_config, mplex_config)
};
let fuel_upgrade = FuelUpgrade::new(p2p_config.checksum);
if p2p_config.reserved_nodes_only_mode {
transport
.authenticate(NoiseWithReservedNodes::new(
noise_authenticated,
&p2p_config.reserved_nodes,
))
.apply(fuel_upgrade)
.multiplex(multiplex_config)
.timeout(TRANSPORT_TIMEOUT)
.boxed()
} else {
transport
.authenticate(noise_authenticated)
.apply(fuel_upgrade)
.multiplex(multiplex_config)
.timeout(TRANSPORT_TIMEOUT)
.boxed()
}
}
#[derive(Clone)]
struct NoiseWithReservedNodes<P, C: Zeroize, R> {
noise_authenticated: NoiseAuthenticated<P, C, R>,
reserved_nodes: HashSet<PeerId>,
}
impl<P, C: Zeroize, R> NoiseWithReservedNodes<P, C, R> {
fn new(
noise_authenticated: NoiseAuthenticated<P, C, R>,
reserved_nodes: &[Multiaddr],
) -> Self {
Self {
noise_authenticated,
reserved_nodes: reserved_nodes
.iter()
.map(|address| PeerId::try_from_multiaddr(address).unwrap())
.collect(),
}
}
}
fn accept_reserved_node<T>(
reserved_nodes: &HashSet<PeerId>,
remote_peer_id: PeerId,
io: NoiseOutput<T>,
) -> future::Ready<Result<(PeerId, NoiseOutput<T>), NoiseError>> {
if reserved_nodes.contains(&remote_peer_id) {
future::ok((remote_peer_id, io))
} else {
future::err(NoiseError::AuthenticationFailed)
}
}
impl<P, C: Zeroize, R> UpgradeInfo for NoiseWithReservedNodes<P, C, R>
where
NoiseAuthenticated<P, C, R>: UpgradeInfo,
{
type Info = <NoiseAuthenticated<P, C, R> as UpgradeInfo>::Info;
type InfoIter = <NoiseAuthenticated<P, C, R> as UpgradeInfo>::InfoIter;
fn protocol_info(&self) -> Self::InfoIter {
self.noise_authenticated.protocol_info()
}
}
impl<T, P, C, R> InboundUpgrade<T> for NoiseWithReservedNodes<P, C, R>
where
NoiseAuthenticated<P, C, R>: UpgradeInfo
+ InboundUpgrade<T, Output = (PeerId, NoiseOutput<T>), Error = NoiseError>
+ 'static,
<NoiseAuthenticated<P, C, R> as InboundUpgrade<T>>::Future: Send,
T: AsyncRead + AsyncWrite + Send + 'static,
C: Protocol<C> + AsRef<[u8]> + Zeroize + Send + 'static,
{
type Output = (PeerId, NoiseOutput<T>);
type Error = NoiseError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
fn upgrade_inbound(self, socket: T, info: Self::Info) -> Self::Future {
Box::pin(
self.noise_authenticated
.upgrade_inbound(socket, info)
.and_then(move |(remote_peer_id, io)| {
accept_reserved_node(&self.reserved_nodes, remote_peer_id, io)
}),
)
}
}
impl<T, P, C, R> OutboundUpgrade<T> for NoiseWithReservedNodes<P, C, R>
where
NoiseAuthenticated<P, C, R>: UpgradeInfo
+ OutboundUpgrade<T, Output = (PeerId, NoiseOutput<T>), Error = NoiseError>
+ 'static,
<NoiseAuthenticated<P, C, R> as OutboundUpgrade<T>>::Future: Send,
T: AsyncRead + AsyncWrite + Send + 'static,
C: Protocol<C> + AsRef<[u8]> + Zeroize + Send + 'static,
{
type Output = (PeerId, NoiseOutput<T>);
type Error = NoiseError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
fn upgrade_outbound(self, socket: T, info: Self::Info) -> Self::Future {
Box::pin(
self.noise_authenticated
.upgrade_outbound(socket, info)
.and_then(move |(remote_peer_id, io)| {
accept_reserved_node(&self.reserved_nodes, remote_peer_id, io)
}),
)
}
}
#[derive(Debug, Clone)]
struct FuelUpgrade {
checksum: Checksum,
}
impl FuelUpgrade {
fn new(checksum: Checksum) -> Self {
Self { checksum }
}
}
#[derive(Debug)]
enum FuelUpgradeError {
IncorrectChecksum,
Io(io::Error),
}
impl fmt::Display for FuelUpgradeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
FuelUpgradeError::Io(e) => write!(f, "{}", e),
FuelUpgradeError::IncorrectChecksum => f.write_str("Fuel node checksum does not match, either ChainId or ChainConfig are not the same, or both."),
}
}
}
impl From<io::Error> for FuelUpgradeError {
fn from(e: io::Error) -> Self {
FuelUpgradeError::Io(e)
}
}
impl Error for FuelUpgradeError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
FuelUpgradeError::Io(e) => Some(e),
FuelUpgradeError::IncorrectChecksum => None,
}
}
}
impl UpgradeInfo for FuelUpgrade {
type Info = &'static [u8];
type InfoIter = std::iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
std::iter::once(b"/fuel/upgrade/0")
}
}
impl<C> InboundUpgrade<C> for FuelUpgrade
where
C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = C;
type Error = FuelUpgradeError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
fn upgrade_inbound(self, mut socket: C, _: Self::Info) -> Self::Future {
async move {
let res = read_length_prefixed(&mut socket, self.checksum.0.len()).await?;
if res != self.checksum.0 {
return Err(FuelUpgradeError::IncorrectChecksum)
}
Ok(socket)
}
.boxed()
}
}
impl<C> OutboundUpgrade<C> for FuelUpgrade
where
C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = C;
type Error = FuelUpgradeError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
fn upgrade_outbound(self, mut socket: C, _: Self::Info) -> Self::Future {
async move {
write_length_prefixed(&mut socket, &self.checksum.0).await?;
Ok(socket)
}
.boxed()
}
}