use crate::{
config::{
Config,
MAX_RESPONSE_SIZE,
},
TryPeerId,
};
use fuel_core_metrics::p2p_metrics::p2p_metrics;
use libp2p::gossipsub::{
self,
MessageAuthenticity,
MessageId,
MetricsConfig,
PeerScoreParams,
PeerScoreThresholds,
Topic,
TopicScoreParams,
};
use sha2::{
Digest,
Sha256,
};
use std::time::Duration;
use super::topics::{
GossipTopic,
NEW_TX_GOSSIP_TOPIC,
};
const SLOTS_PER_EPOCH: u64 = 32;
const SLOT: Duration = Duration::from_secs(6);
const EPOCH: Duration = Duration::from_secs(SLOTS_PER_EPOCH * SLOT.as_secs());
const DECAY_TO_ZERO: f64 = 0.8;
const DECAY_INTERVAL: Duration = SLOT;
const MESH_SIZE: usize = 8;
const NEW_TX_GOSSIP_WEIGHT: f64 = 0.05;
pub const GRAYLIST_THRESHOLD: f64 = -16000.0;
pub fn default_gossipsub_builder() -> gossipsub::ConfigBuilder {
let gossip_message_id = move |message: &gossipsub::Message| {
MessageId::from(&Sha256::digest(&message.data)[..])
};
let mut builder = gossipsub::ConfigBuilder::default();
builder
.protocol_id_prefix("/meshsub/1.0.0")
.message_id_fn(gossip_message_id)
.validate_messages();
builder
}
pub(crate) fn default_gossipsub_config() -> gossipsub::Config {
default_gossipsub_builder()
.mesh_n(MESH_SIZE)
.mesh_n_low(6)
.mesh_n_high(12)
.gossip_lazy(6)
.history_length(5)
.history_gossip(3)
.max_transmit_size(MAX_RESPONSE_SIZE)
.heartbeat_interval(Duration::from_millis(700))
.fanout_ttl(Duration::from_secs(60))
.build()
.expect("valid gossipsub configuration")
}
fn initialize_topic_score_params(topic_weight: f64) -> TopicScoreParams {
let mut params = TopicScoreParams::default();
params.topic_weight = topic_weight;
params.time_in_mesh_quantum = SLOT;
params.time_in_mesh_cap = 3600.0 / params.time_in_mesh_quantum.as_secs_f64();
params.time_in_mesh_weight = 0.5;
params.first_message_deliveries_decay = score_parameter_decay(
EPOCH
.checked_mul(100)
.expect("`EPOCH` is usually not more than a year"),
);
params.first_message_deliveries_cap = 1000.0;
params.first_message_deliveries_weight = 0.5;
params.mesh_message_deliveries_weight = 0.0;
params.mesh_message_deliveries_threshold = 0.0;
params.mesh_message_deliveries_decay = 0.0;
params.mesh_message_deliveries_cap = 0.0;
params.mesh_message_deliveries_window = Duration::from_secs(0);
params.mesh_message_deliveries_activation = Duration::from_secs(0);
params.mesh_failure_penalty_decay = 0.0;
params.mesh_failure_penalty_weight = 0.0;
params.invalid_message_deliveries_weight = -10.0 / params.topic_weight; params.invalid_message_deliveries_decay = score_parameter_decay(
EPOCH
.checked_mul(50)
.expect("`EPOCH` is usually not more than a year"),
);
params
}
fn score_parameter_decay(decay_time: Duration) -> f64 {
let ticks = decay_time.as_secs_f64() / DECAY_INTERVAL.as_secs_f64();
DECAY_TO_ZERO.powf(1.0 / ticks)
}
fn initialize_peer_score_params(thresholds: &PeerScoreThresholds) -> PeerScoreParams {
let mut params = PeerScoreParams {
decay_interval: DECAY_INTERVAL,
decay_to_zero: DECAY_TO_ZERO,
retain_score: EPOCH
.checked_mul(100)
.expect("`EPOCH` is usually not more than a year"),
app_specific_weight: 0.0,
ip_colocation_factor_threshold: 50.0, behaviour_penalty_threshold: 6.0,
behaviour_penalty_decay: score_parameter_decay(
EPOCH
.checked_mul(10)
.expect("`EPOCH` is usually not more than a year"),
),
..Default::default()
};
let target_value =
params.behaviour_penalty_decay - params.behaviour_penalty_threshold;
params.behaviour_penalty_weight = thresholds.gossip_threshold / target_value.powi(2);
params.topic_score_cap = 400.0;
params.ip_colocation_factor_weight = -params.topic_score_cap;
params
}
fn initialize_peer_score_thresholds() -> PeerScoreThresholds {
PeerScoreThresholds {
gossip_threshold: -4000.0,
publish_threshold: -8000.0,
graylist_threshold: GRAYLIST_THRESHOLD,
accept_px_threshold: 40.0,
opportunistic_graft_threshold: 5.0,
}
}
pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> gossipsub::Behaviour {
let mut gossipsub = if p2p_config.metrics {
let mut p2p_registry = prometheus_client::registry::Registry::default();
let metrics_config = MetricsConfig::default();
let mut gossipsub = gossipsub::Behaviour::new_with_metrics(
MessageAuthenticity::Signed(p2p_config.keypair.clone()),
p2p_config.gossipsub_config.clone(),
&mut p2p_registry,
metrics_config,
)
.expect("gossipsub initialized");
p2p_metrics()
.gossip_sub_registry
.set(Box::new(p2p_registry))
.unwrap_or(());
initialize_gossipsub(&mut gossipsub, p2p_config);
gossipsub
} else {
let mut gossipsub = gossipsub::Behaviour::new(
MessageAuthenticity::Signed(p2p_config.keypair.clone()),
p2p_config.gossipsub_config.clone(),
)
.expect("gossipsub initialized");
initialize_gossipsub(&mut gossipsub, p2p_config);
gossipsub
};
let reserved_nodes = p2p_config.reserved_nodes.clone();
let explicit_peers = reserved_nodes
.iter()
.filter_map(|address| address.try_to_peer_id());
for peer_id in explicit_peers {
gossipsub.add_explicit_peer(&peer_id);
}
gossipsub
}
fn initialize_gossipsub(gossipsub: &mut gossipsub::Behaviour, p2p_config: &Config) {
let peer_score_thresholds = initialize_peer_score_thresholds();
let peer_score_params = initialize_peer_score_params(&peer_score_thresholds);
gossipsub
.with_peer_score(peer_score_params, peer_score_thresholds)
.expect("gossipsub initialized with peer score");
let topics = vec![(NEW_TX_GOSSIP_TOPIC, NEW_TX_GOSSIP_WEIGHT)];
for (topic, weight) in topics {
let t: GossipTopic = Topic::new(format!("{}/{}", topic, p2p_config.network_name));
gossipsub
.set_topic_params(t.clone(), initialize_topic_score_params(weight))
.expect("First time initializing Topic Score");
gossipsub
.subscribe(&t)
.expect("Subscription to Topic: {topic} successful");
}
}