use std::{
cmp::{max, Ordering},
collections::HashSet,
collections::VecDeque,
collections::{BTreeSet, HashMap},
fmt,
iter::FromIterator,
net::IpAddr,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use futures::StreamExt;
use log::{debug, error, info, trace, warn};
use prost::Message;
use rand::{seq::SliceRandom, thread_rng};
use wasm_timer::{Instant, Interval};
use libp2p_core::{
connection::ConnectionId, identity::Keypair, multiaddr::Protocol::Ip4,
multiaddr::Protocol::Ip6, ConnectedPoint, Multiaddr, PeerId,
};
use libp2p_swarm::{
DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
ProtocolsHandler,
};
use crate::backoff::BackoffStorage;
use crate::config::{GossipsubConfig, ValidationMode};
use crate::error::{PublishError, SubscriptionError, ValidationError};
use crate::gossip_promises::GossipPromises;
use crate::handler::{GossipsubHandler, HandlerEvent};
use crate::mcache::MessageCache;
use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
use crate::protocol::SIGNING_PREFIX;
use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter};
use crate::time_cache::{DuplicateCache, TimeCache};
use crate::topic::{Hasher, Topic, TopicHash};
use crate::transform::{DataTransform, IdentityTransform};
use crate::types::{
FastMessageId, GossipsubControlAction, GossipsubMessage, GossipsubSubscription,
GossipsubSubscriptionAction, MessageAcceptance, MessageId, PeerInfo, RawGossipsubMessage,
};
use crate::types::{GossipsubRpc, PeerKind};
use crate::{rpc_proto, TopicScoreParams};
use std::{cmp::Ordering::Equal, fmt::Debug};
#[cfg(test)]
mod tests;
#[derive(Clone)]
pub enum MessageAuthenticity {
Signed(Keypair),
Author(PeerId),
RandomAuthor,
Anonymous,
}
impl MessageAuthenticity {
pub fn is_signing(&self) -> bool {
matches!(self, MessageAuthenticity::Signed(_))
}
pub fn is_anonymous(&self) -> bool {
matches!(self, MessageAuthenticity::Anonymous)
}
}
#[derive(Debug)]
pub enum GossipsubEvent {
Message {
propagation_source: PeerId,
message_id: MessageId,
message: GossipsubMessage,
},
Subscribed {
peer_id: PeerId,
topic: TopicHash,
},
Unsubscribed {
peer_id: PeerId,
topic: TopicHash,
},
}
enum PublishConfig {
Signing {
keypair: Keypair,
author: PeerId,
inline_key: Option<Vec<u8>>,
},
Author(PeerId),
RandomAuthor,
Anonymous,
}
impl PublishConfig {
pub fn get_own_id(&self) -> Option<&PeerId> {
match self {
Self::Signing { author, .. } => Some(&author),
Self::Author(author) => Some(&author),
_ => None,
}
}
}
impl From<MessageAuthenticity> for PublishConfig {
fn from(authenticity: MessageAuthenticity) -> Self {
match authenticity {
MessageAuthenticity::Signed(keypair) => {
let public_key = keypair.public();
let key_enc = public_key.clone().into_protobuf_encoding();
let key = if key_enc.len() <= 42 {
None
} else {
Some(key_enc)
};
PublishConfig::Signing {
keypair,
author: public_key.into_peer_id(),
inline_key: key,
}
}
MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
}
}
}
type GossipsubNetworkBehaviourAction = NetworkBehaviourAction<Arc<rpc_proto::Rpc>, GossipsubEvent>;
pub struct Gossipsub<
D: DataTransform = IdentityTransform,
F: TopicSubscriptionFilter = AllowAllSubscriptionFilter,
> {
config: GossipsubConfig,
events: VecDeque<GossipsubNetworkBehaviourAction>,
control_pool: HashMap<PeerId, Vec<GossipsubControlAction>>,
publish_config: PublishConfig,
duplicate_cache: DuplicateCache<MessageId>,
peer_protocols: HashMap<PeerId, PeerKind>,
topic_peers: HashMap<TopicHash, BTreeSet<PeerId>>,
peer_topics: HashMap<PeerId, BTreeSet<TopicHash>>,
explicit_peers: HashSet<PeerId>,
blacklisted_peers: HashSet<PeerId>,
mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
fanout_last_pub: HashMap<TopicHash, Instant>,
backoffs: BackoffStorage,
mcache: MessageCache,
heartbeat: Interval,
heartbeat_ticks: u64,
px_peers: HashSet<PeerId>,
outbound_peers: HashSet<PeerId>,
peer_score: Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>,
count_received_ihave: HashMap<PeerId, usize>,
count_sent_iwant: HashMap<PeerId, usize>,
published_message_ids: DuplicateCache<MessageId>,
fast_messsage_id_cache: TimeCache<FastMessageId, MessageId>,
subscription_filter: F,
data_transform: D,
}
impl<D, F> Gossipsub<D, F>
where
D: DataTransform + Default,
F: TopicSubscriptionFilter + Default,
{
pub fn new(
privacy: MessageAuthenticity,
config: GossipsubConfig,
) -> Result<Self, &'static str> {
Self::new_with_subscription_filter_and_transform(
privacy,
config,
F::default(),
D::default(),
)
}
}
impl<D, F> Gossipsub<D, F>
where
D: DataTransform + Default,
F: TopicSubscriptionFilter,
{
pub fn new_with_subscription_filter(
privacy: MessageAuthenticity,
config: GossipsubConfig,
subscription_filter: F,
) -> Result<Self, &'static str> {
Self::new_with_subscription_filter_and_transform(
privacy,
config,
subscription_filter,
D::default(),
)
}
}
impl<D, F> Gossipsub<D, F>
where
D: DataTransform,
F: TopicSubscriptionFilter + Default,
{
pub fn new_with_transform(
privacy: MessageAuthenticity,
config: GossipsubConfig,
data_transform: D,
) -> Result<Self, &'static str> {
Self::new_with_subscription_filter_and_transform(
privacy,
config,
F::default(),
data_transform,
)
}
}
impl<D, F> Gossipsub<D, F>
where
D: DataTransform,
F: TopicSubscriptionFilter,
{
pub fn new_with_subscription_filter_and_transform(
privacy: MessageAuthenticity,
config: GossipsubConfig,
subscription_filter: F,
data_transform: D,
) -> Result<Self, &'static str> {
validate_config(&privacy, &config.validation_mode())?;
Ok(Gossipsub {
events: VecDeque::new(),
control_pool: HashMap::new(),
publish_config: privacy.into(),
duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
fast_messsage_id_cache: TimeCache::new(config.duplicate_cache_time()),
topic_peers: HashMap::new(),
peer_topics: HashMap::new(),
explicit_peers: HashSet::new(),
blacklisted_peers: HashSet::new(),
mesh: HashMap::new(),
fanout: HashMap::new(),
fanout_last_pub: HashMap::new(),
backoffs: BackoffStorage::new(
&config.prune_backoff(),
config.heartbeat_interval(),
config.backoff_slack(),
),
mcache: MessageCache::new(config.history_gossip(), config.history_length()),
heartbeat: Interval::new_at(
Instant::now() + config.heartbeat_initial_delay(),
config.heartbeat_interval(),
),
heartbeat_ticks: 0,
px_peers: HashSet::new(),
outbound_peers: HashSet::new(),
peer_score: None,
count_received_ihave: HashMap::new(),
count_sent_iwant: HashMap::new(),
peer_protocols: HashMap::new(),
published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
config,
subscription_filter,
data_transform,
})
}
}
impl<D, F> Gossipsub<D, F>
where
D: DataTransform,
F: TopicSubscriptionFilter,
{
pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
self.mesh.keys()
}
pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
self.mesh
.get(topic_hash)
.into_iter()
.map(|x| x.iter())
.flatten()
}
pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
let mut res = BTreeSet::new();
for peers in self.mesh.values() {
res.extend(peers);
}
res.into_iter()
}
pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
self.peer_topics
.iter()
.map(|(peer_id, topic_set)| (peer_id, topic_set.iter().collect()))
}
pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
self.peer_protocols.iter()
}
pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
self.peer_score
.as_ref()
.map(|(score, ..)| score.score(peer_id))
}
pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
debug!("Subscribing to topic: {}", topic);
let topic_hash = topic.hash();
if !self.subscription_filter.can_subscribe(&topic_hash) {
return Err(SubscriptionError::NotAllowed);
}
if self.mesh.get(&topic_hash).is_some() {
debug!("Topic: {} is already in the mesh.", topic);
return Ok(false);
}
let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
if !peer_list.is_empty() {
let event = Arc::new(
GossipsubRpc {
messages: Vec::new(),
subscriptions: vec![GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
}],
control_msgs: Vec::new(),
}
.into_protobuf(),
);
for peer in peer_list {
debug!("Sending SUBSCRIBE to peer: {:?}", peer);
self.send_message(peer, event.clone())
.map_err(SubscriptionError::PublishError)?;
}
}
self.join(&topic_hash);
info!("Subscribed to topic: {}", topic);
Ok(true)
}
pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, PublishError> {
debug!("Unsubscribing from topic: {}", topic);
let topic_hash = topic.hash();
if self.mesh.get(&topic_hash).is_none() {
debug!("Already unsubscribed from topic: {:?}", topic_hash);
return Ok(false);
}
let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
if !peer_list.is_empty() {
let event = Arc::new(
GossipsubRpc {
messages: Vec::new(),
subscriptions: vec![GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Unsubscribe,
}],
control_msgs: Vec::new(),
}
.into_protobuf(),
);
for peer in peer_list {
debug!("Sending UNSUBSCRIBE to peer: {}", peer.to_string());
self.send_message(peer, event.clone())?;
}
}
self.leave(&topic_hash);
info!("Unsubscribed from topic: {:?}", topic_hash);
Ok(true)
}
pub fn publish<H: Hasher>(
&mut self,
topic: Topic<H>,
data: impl Into<Vec<u8>>,
) -> Result<MessageId, PublishError> {
let data = data.into();
let transformed_data = self
.data_transform
.outbound_transform(&topic.hash(), data.clone())?;
let raw_message = self.build_raw_message(topic.into(), transformed_data)?;
let msg_id = self.config.message_id(&GossipsubMessage {
source: raw_message.source.clone(),
data,
sequence_number: raw_message.sequence_number,
topic: raw_message.topic.clone(),
});
let event = Arc::new(
GossipsubRpc {
subscriptions: Vec::new(),
messages: vec![raw_message.clone()],
control_msgs: Vec::new(),
}
.into_protobuf(),
);
if event.encoded_len() > self.config.max_transmit_size() {
return Err(PublishError::MessageTooLarge);
}
if !self.duplicate_cache.insert(msg_id.clone()) {
warn!(
"Not publishing a message that has already been published. Msg-id {}",
msg_id
);
return Err(PublishError::Duplicate);
}
self.mcache.put(&msg_id, raw_message.clone());
debug!("Publishing message: {:?}", msg_id);
if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
if !self.config.allow_self_origin() {
self.published_message_ids.insert(msg_id.clone());
}
}
let topic_hash = raw_message.topic.clone();
let mesh_peers_sent =
!self.config.flood_publish() && self.forward_msg(&msg_id, raw_message, None)?;
let mut recipient_peers = HashSet::new();
if let Some(set) = self.topic_peers.get(&topic_hash) {
if self.config.flood_publish() {
recipient_peers.extend(
set.iter()
.filter(|p| {
self.explicit_peers.contains(*p)
|| !self.score_below_threshold(*p, |ts| ts.publish_threshold).0
})
.cloned(),
);
} else {
for peer in &self.explicit_peers {
if set.contains(peer) {
recipient_peers.insert(peer.clone());
}
}
for (peer, kind) in &self.peer_protocols {
if kind == &PeerKind::Floodsub
&& !self
.score_below_threshold(peer, |ts| ts.publish_threshold)
.0
{
recipient_peers.insert(peer.clone());
}
}
if self.mesh.get(&topic_hash).is_none() {
debug!("Topic: {:?} not in the mesh", topic_hash);
if self.fanout.contains_key(&topic_hash) {
for peer in self.fanout.get(&topic_hash).expect("Topic must exist") {
recipient_peers.insert(peer.clone());
}
} else {
let mesh_n = self.config.mesh_n();
let new_peers = get_random_peers(
&self.topic_peers,
&self.peer_protocols,
&topic_hash,
mesh_n,
{
|p| {
!self.explicit_peers.contains(p)
&& !self
.score_below_threshold(p, |pst| pst.publish_threshold)
.0
}
},
);
self.fanout.insert(topic_hash.clone(), new_peers.clone());
for peer in new_peers {
debug!("Peer added to fanout: {:?}", peer);
recipient_peers.insert(peer.clone());
}
}
self.fanout_last_pub
.insert(topic_hash.clone(), Instant::now());
}
}
}
if recipient_peers.is_empty() && !mesh_peers_sent {
return Err(PublishError::InsufficientPeers);
}
for peer_id in recipient_peers.iter() {
debug!("Sending message to peer: {:?}", peer_id);
self.send_message(peer_id.clone(), event.clone())?;
}
info!("Published message: {:?}", &msg_id);
Ok(msg_id)
}
pub fn report_message_validation_result(
&mut self,
msg_id: &MessageId,
propagation_source: &PeerId,
acceptance: MessageAcceptance,
) -> Result<bool, PublishError> {
let reject_reason = match acceptance {
MessageAcceptance::Accept => {
let raw_message = match self.mcache.validate(msg_id) {
Some(raw_message) => raw_message.clone(),
None => {
warn!(
"Message not in cache. Ignoring forwarding. Message Id: {}",
msg_id
);
return Ok(false);
}
};
self.forward_msg(msg_id, raw_message, Some(propagation_source))?;
return Ok(true);
}
MessageAcceptance::Reject => RejectReason::ValidationFailed,
MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
};
if let Some(raw_message) = self.mcache.remove(msg_id) {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.reject_message(
propagation_source,
msg_id,
&raw_message.topic,
reject_reason,
);
}
Ok(true)
} else {
warn!("Rejected message not in cache. Message Id: {}", msg_id);
Ok(false)
}
}
pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
debug!("Adding explicit peer {}", peer_id);
self.explicit_peers.insert(peer_id.clone());
self.check_explicit_peer_connection(peer_id);
}
pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
debug!("Removing explicit peer {}", peer_id);
self.explicit_peers.remove(peer_id);
}
pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
if self.blacklisted_peers.insert(peer_id.clone()) {
debug!("Peer has been blacklisted: {}", peer_id);
}
}
pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
if self.blacklisted_peers.remove(peer_id) {
debug!("Peer has been removed from the blacklist: {}", peer_id);
}
}
pub fn with_peer_score(
&mut self,
params: PeerScoreParams,
threshold: PeerScoreThresholds,
) -> Result<(), String> {
self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
}
pub fn with_peer_score_and_message_delivery_time_callback(
&mut self,
params: PeerScoreParams,
threshold: PeerScoreThresholds,
callback: Option<fn(&PeerId, &TopicHash, f64)>,
) -> Result<(), String> {
params.validate()?;
threshold.validate()?;
if self.peer_score.is_some() {
return Err("Peer score set twice".into());
}
let interval = Interval::new(params.decay_interval);
let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default()));
Ok(())
}
pub fn set_topic_params<H: Hasher>(
&mut self,
topic: Topic<H>,
params: TopicScoreParams,
) -> Result<(), &'static str> {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.set_topic_params(topic.hash(), params);
Ok(())
} else {
Err("Peer score must be initialised with `with_peer_score()`")
}
}
pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.set_application_score(peer_id, new_score)
} else {
false
}
}
fn join(&mut self, topic_hash: &TopicHash) {
debug!("Running JOIN for topic: {:?}", topic_hash);
if self.mesh.contains_key(topic_hash) {
info!("JOIN: The topic is already in the mesh, ignoring JOIN");
return;
}
let mut added_peers = HashSet::new();
if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
debug!(
"JOIN: Removing peers from the fanout for topic: {:?}",
topic_hash
);
peers = peers
.into_iter()
.filter(|p| {
!self.explicit_peers.contains(p)
&& !self.score_below_threshold(p, |_| 0.0).0
&& !self.backoffs.is_backoff_with_slack(topic_hash, p)
})
.collect();
let add_peers = std::cmp::min(peers.len(), self.config.mesh_n());
debug!(
"JOIN: Adding {:?} peers from the fanout for topic: {:?}",
add_peers, topic_hash
);
added_peers.extend(peers.iter().cloned().take(add_peers));
self.mesh.insert(
topic_hash.clone(),
peers.into_iter().take(add_peers).collect(),
);
self.fanout_last_pub.remove(topic_hash);
}
if added_peers.len() < self.config.mesh_n() {
let new_peers = get_random_peers(
&self.topic_peers,
&self.peer_protocols,
topic_hash,
self.config.mesh_n() - added_peers.len(),
|peer| {
!added_peers.contains(peer)
&& !self.explicit_peers.contains(peer)
&& !self.score_below_threshold(peer, |_| 0.0).0
&& !self.backoffs.is_backoff_with_slack(topic_hash, peer)
},
);
added_peers.extend(new_peers.clone());
debug!(
"JOIN: Inserting {:?} random peers into the mesh",
new_peers.len()
);
let mesh_peers = self
.mesh
.entry(topic_hash.clone())
.or_insert_with(Default::default);
mesh_peers.extend(new_peers);
}
for peer_id in added_peers {
info!("JOIN: Sending Graft message to peer: {:?}", peer_id);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.graft(&peer_id, topic_hash.clone());
}
Self::control_pool_add(
&mut self.control_pool,
peer_id.clone(),
GossipsubControlAction::Graft {
topic_hash: topic_hash.clone(),
},
);
}
debug!("Completed JOIN for topic: {:?}", topic_hash);
}
fn make_prune(
&mut self,
topic_hash: &TopicHash,
peer: &PeerId,
do_px: bool,
) -> GossipsubControlAction {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.prune(peer, topic_hash.clone());
}
match self.peer_protocols.get(peer) {
Some(PeerKind::Floodsub) => {
error!("Attempted to prune a Floodsub peer");
}
Some(PeerKind::Gossipsub) => {
return GossipsubControlAction::Prune {
topic_hash: topic_hash.clone(),
peers: Vec::new(),
backoff: None,
};
}
None => {
error!("Attempted to Prune an unknown peer");
}
_ => {}
}
let peers = if do_px {
get_random_peers(
&self.topic_peers,
&self.peer_protocols,
&topic_hash,
self.config.prune_peers(),
|p| p != peer && !self.score_below_threshold(p, |_| 0.0).0,
)
.into_iter()
.map(|p| PeerInfo { peer_id: Some(p) })
.collect()
} else {
Vec::new()
};
self.backoffs
.update_backoff(topic_hash, peer, self.config.prune_backoff());
GossipsubControlAction::Prune {
topic_hash: topic_hash.clone(),
peers,
backoff: Some(self.config.prune_backoff().as_secs()),
}
}
fn leave(&mut self, topic_hash: &TopicHash) {
debug!("Running LEAVE for topic {:?}", topic_hash);
if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
for peer in peers {
info!("LEAVE: Sending PRUNE to peer: {:?}", peer);
let control = self.make_prune(topic_hash, &peer, self.config.do_px());
Self::control_pool_add(&mut self.control_pool, peer.clone(), control);
}
}
debug!("Completed LEAVE for topic: {:?}", topic_hash);
}
fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
if !self.peer_topics.contains_key(peer_id) {
debug!("Connecting to explicit peer {:?}", peer_id);
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: peer_id.clone(),
condition: DialPeerCondition::Disconnected,
});
}
}
fn score_below_threshold(
&self,
peer_id: &PeerId,
threshold: impl Fn(&PeerScoreThresholds) -> f64,
) -> (bool, f64) {
Self::score_below_threshold_from_scores(&self.peer_score, peer_id, threshold)
}
fn score_below_threshold_from_scores(
peer_score: &Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>,
peer_id: &PeerId,
threshold: impl Fn(&PeerScoreThresholds) -> f64,
) -> (bool, f64) {
if let Some((peer_score, thresholds, ..)) = peer_score {
let score = peer_score.score(peer_id);
if score < threshold(thresholds) {
return (true, score);
}
(false, score)
} else {
(false, 0.0)
}
}
fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
debug!(
"IHAVE: ignoring peer {:?} with score below threshold [score = {}]",
peer_id, score
);
return;
}
let peer_have = self
.count_received_ihave
.entry(peer_id.clone())
.or_insert(0);
*peer_have += 1;
if *peer_have > self.config.max_ihave_messages() {
debug!(
"IHAVE: peer {} has advertised too many times ({}) within this heartbeat \
interval; ignoring",
peer_id, *peer_have
);
return;
}
if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
if *iasked >= self.config.max_ihave_length() {
debug!(
"IHAVE: peer {} has already advertised too many messages ({}); ignoring",
peer_id, *iasked
);
return;
}
}
debug!("Handling IHAVE for peer: {:?}", peer_id);
let mut iwant_ids = HashSet::new();
for (topic, ids) in ihave_msgs {
if !self.mesh.contains_key(&topic) {
debug!(
"IHAVE: Ignoring IHAVE - Not subscribed to topic: {:?}",
topic
);
continue;
}
for id in ids {
if !self.duplicate_cache.contains(&id) {
iwant_ids.insert(id);
}
}
}
if !iwant_ids.is_empty() {
let iasked = self.count_sent_iwant.entry(peer_id.clone()).or_insert(0);
let mut iask = iwant_ids.len();
if *iasked + iask > self.config.max_ihave_length() {
iask = self.config.max_ihave_length().saturating_sub(*iasked);
}
debug!(
"IHAVE: Asking for {} out of {} messages from {}",
iask,
iwant_ids.len(),
peer_id
);
let mut iwant_ids_vec: Vec<_> = iwant_ids.iter().collect();
let mut rng = thread_rng();
iwant_ids_vec.partial_shuffle(&mut rng, iask as usize);
iwant_ids_vec.truncate(iask as usize);
*iasked += iask;
let message_ids = iwant_ids_vec.into_iter().cloned().collect::<Vec<_>>();
if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
gossip_promises.add_promise(
peer_id.clone(),
&message_ids,
Instant::now() + self.config.iwant_followup_time(),
);
}
debug!(
"IHAVE: Asking for the following messages from {}: {:?}",
peer_id, message_ids
);
Self::control_pool_add(
&mut self.control_pool,
peer_id.clone(),
GossipsubControlAction::IWant { message_ids },
);
}
debug!("Completed IHAVE handling for peer: {:?}", peer_id);
}
fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
debug!(
"IWANT: ignoring peer {:?} with score below threshold [score = {}]",
peer_id, score
);
return;
}
debug!("Handling IWANT for peer: {:?}", peer_id);
let mut cached_messages = HashMap::new();
for id in iwant_msgs {
if let Some((msg, count)) = self.mcache.get_with_iwant_counts(&id, peer_id) {
if count > self.config.gossip_retransimission() {
debug!(
"IWANT: Peer {} has asked for message {} too many times; ignoring \
request",
peer_id, &id
);
} else {
cached_messages.insert(id.clone(), msg.clone());
}
}
}
if !cached_messages.is_empty() {
debug!("IWANT: Sending cached messages to peer: {:?}", peer_id);
let message_list = cached_messages
.into_iter()
.map(|entry| RawGossipsubMessage::from(entry.1))
.collect();
if self
.send_message(
peer_id.clone(),
GossipsubRpc {
subscriptions: Vec::new(),
messages: message_list,
control_msgs: Vec::new(),
}
.into_protobuf(),
)
.is_err()
{
error!("Failed to send cached messages. Messages too large");
}
}
debug!("Completed IWANT handling for peer: {}", peer_id);
}
fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
debug!("Handling GRAFT message for peer: {}", peer_id);
let mut to_prune_topics = HashSet::new();
let mut do_px = self.config.do_px();
if self.explicit_peers.contains(peer_id) {
warn!("GRAFT: ignoring request from direct peer {}", peer_id);
to_prune_topics = HashSet::from_iter(topics.into_iter());
do_px = false
} else {
let (below_zero, score) = self.score_below_threshold(peer_id, |_| 0.0);
let now = Instant::now();
for topic_hash in topics {
if let Some(peers) = self.mesh.get_mut(&topic_hash) {
if peers.contains(peer_id) {
debug!(
"GRAFT: Received graft for peer {:?} that is already in topic {:?}",
peer_id, &topic_hash
);
continue;
}
if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
{
if backoff_time > now {
warn!(
"GRAFT: peer attempted graft within backoff time, penalizing {}",
peer_id
);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.add_penalty(peer_id, 1);
let flood_cutoff = (backoff_time
+ self.config.graft_flood_threshold())
- self.config.prune_backoff();
if flood_cutoff > now {
peer_score.add_penalty(peer_id, 1);
}
}
do_px = false;
to_prune_topics.insert(topic_hash.clone());
continue;
}
}
if below_zero {
debug!(
"GRAFT: ignoring peer {:?} with negative score [score = {}, \
topic = {}]",
peer_id, score, topic_hash
);
to_prune_topics.insert(topic_hash.clone());
do_px = false;
continue;
}
if peers.len() >= self.config.mesh_n_high()
&& !self.outbound_peers.contains(peer_id)
{
to_prune_topics.insert(topic_hash.clone());
continue;
}
info!(
"GRAFT: Mesh link added for peer: {:?} in topic: {:?}",
peer_id, &topic_hash
);
peers.insert(peer_id.clone());
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.graft(peer_id, topic_hash);
}
} else {
do_px = false;
debug!(
"GRAFT: Received graft for unknown topic {:?} from peer {:?}",
&topic_hash, peer_id
);
continue;
}
}
}
if !to_prune_topics.is_empty() {
let prune_messages = to_prune_topics
.iter()
.map(|t| self.make_prune(t, peer_id, do_px))
.collect();
info!(
"GRAFT: Not subscribed to topics - Sending PRUNE to peer: {}",
peer_id
);
if self
.send_message(
peer_id.clone(),
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: prune_messages,
}
.into_protobuf(),
)
.is_err()
{
error!("Failed to send graft. Message too large");
}
}
debug!("Completed GRAFT handling for peer: {}", peer_id);
}
fn remove_peer_from_mesh(
&mut self,
peer_id: &PeerId,
topic_hash: &TopicHash,
backoff: Option<u64>,
always_update_backoff: bool,
) {
let mut update_backoff = always_update_backoff;
if let Some(peers) = self.mesh.get_mut(&topic_hash) {
if peers.remove(peer_id) {
info!(
"PRUNE: Removing peer: {} from the mesh for topic: {}",
peer_id.to_string(),
topic_hash
);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.prune(peer_id, topic_hash.clone());
}
update_backoff = true;
}
}
if update_backoff {
let time = if let Some(backoff) = backoff {
Duration::from_secs(backoff)
} else {
self.config.prune_backoff()
};
self.backoffs.update_backoff(&topic_hash, peer_id, time);
}
}
fn handle_prune(
&mut self,
peer_id: &PeerId,
prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
) {
debug!("Handling PRUNE message for peer: {}", peer_id);
let (below_threshold, score) =
self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold);
for (topic_hash, px, backoff) in prune_data {
self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true);
if self.mesh.contains_key(&topic_hash) {
if !px.is_empty() {
if below_threshold {
debug!(
"PRUNE: ignoring PX from peer {:?} with insufficient score \
[score ={} topic = {}]",
peer_id, score, topic_hash
);
continue;
}
if self.config.prune_peers() > 0 {
self.px_connect(px);
}
}
}
}
debug!("Completed PRUNE handling for peer: {}", peer_id.to_string());
}
fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
let n = self.config.prune_peers();
px = px.into_iter().filter(|p| p.peer_id.is_some()).collect();
if px.len() > n {
let mut rng = thread_rng();
px.partial_shuffle(&mut rng, n);
px = px.into_iter().take(n).collect();
}
for p in px {
if let Some(peer_id) = p.peer_id {
self.px_peers.insert(peer_id.clone());
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::Disconnected,
});
}
}
}
fn message_is_valid(
&mut self,
msg_id: &MessageId,
raw_message: &mut RawGossipsubMessage,
propagation_source: &PeerId,
) -> bool {
debug!(
"Handling message: {:?} from peer: {}",
msg_id,
propagation_source.to_string()
);
if self.blacklisted_peers.contains(propagation_source) {
debug!(
"Rejecting message from blacklisted peer: {}",
propagation_source
);
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
peer_score.reject_message(
propagation_source,
msg_id,
&raw_message.topic,
RejectReason::BlackListedPeer,
);
gossip_promises.reject_message(msg_id, &RejectReason::BlackListedPeer);
}
return false;
}
if let Some(source) = raw_message.source.as_ref() {
if self.blacklisted_peers.contains(source) {
debug!(
"Rejecting message from peer {} because of blacklisted source: {}",
propagation_source, source
);
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
peer_score.reject_message(
propagation_source,
msg_id,
&raw_message.topic,
RejectReason::BlackListedSource,
);
gossip_promises.reject_message(msg_id, &RejectReason::BlackListedSource);
}
return false;
}
}
if !self.config.validate_messages() {
raw_message.validated = true;
}
let self_published = !self.config.allow_self_origin()
&& if let Some(own_id) = self.publish_config.get_own_id() {
own_id != propagation_source
&& raw_message.source.as_ref().map_or(false, |s| s == own_id)
} else {
self.published_message_ids.contains(&msg_id)
};
if self_published {
debug!(
"Dropping message {} claiming to be from self but forwarded from {}",
msg_id, propagation_source
);
if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score {
peer_score.reject_message(
propagation_source,
msg_id,
&raw_message.topic,
RejectReason::SelfOrigin,
);
gossip_promises.reject_message(msg_id, &RejectReason::SelfOrigin);
}
return false;
}
true
}
fn handle_received_message(
&mut self,
mut raw_message: RawGossipsubMessage,
propagation_source: &PeerId,
) {
let fast_message_id = self.config.fast_message_id(&raw_message);
if let Some(fast_message_id) = fast_message_id.as_ref() {
if let Some(msg_id) = self.fast_messsage_id_cache.get(fast_message_id) {
let msg_id = msg_id.clone();
self.message_is_valid(&msg_id, &mut raw_message, propagation_source);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.duplicated_message(propagation_source, &msg_id, &raw_message.topic);
}
return;
}
}
let message = match self.data_transform.inbound_transform(raw_message.clone()) {
Ok(message) => message,
Err(e) => {
debug!("Invalid message. Transform error: {:?}", e);
self.handle_invalid_message(
propagation_source,
raw_message,
ValidationError::TransformFailed,
);
return;
}
};
let msg_id = self.config.message_id(&message);
if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
return;
}
if let Some(fast_message_id) = fast_message_id {
self.fast_messsage_id_cache
.entry(fast_message_id)
.or_insert_with(|| msg_id.clone());
}
if !self.duplicate_cache.insert(msg_id.clone()) {
debug!(
"Message already received, ignoring. Message: {}",
msg_id.clone()
);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
}
return;
}
debug!(
"Put message {:?} in duplicate_cache and resolve promises",
msg_id
);
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
peer_score.validate_message(propagation_source, &msg_id, &message.topic);
gossip_promises.message_delivered(&msg_id);
}
self.mcache.put(&msg_id, raw_message.clone());
if self.mesh.contains_key(&message.topic) {
debug!("Sending received message to user");
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Message {
propagation_source: propagation_source.clone(),
message_id: msg_id.clone(),
message,
},
));
} else {
debug!(
"Received message on a topic we are not subscribed to: {:?}",
message.topic
);
return;
}
if !self.config.validate_messages() {
if self
.forward_msg(&msg_id, raw_message, Some(propagation_source))
.is_err()
{
error!("Failed to forward message. Too large");
}
debug!("Completed message handling for message: {:?}", msg_id);
}
}
fn handle_invalid_message(
&mut self,
propagation_source: &PeerId,
raw_message: RawGossipsubMessage,
validation_error: ValidationError,
) {
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
let reason = RejectReason::ValidationError(validation_error);
let fast_message_id_cache = &self.fast_messsage_id_cache;
if let Some(msg_id) = self
.config
.fast_message_id(&raw_message)
.and_then(|id| fast_message_id_cache.get(&id))
{
peer_score.reject_message(propagation_source, msg_id, &raw_message.topic, reason);
gossip_promises.reject_message(msg_id, &reason);
} else {
peer_score.reject_invalid_message(propagation_source, &raw_message.topic);
}
}
}
fn handle_received_subscriptions(
&mut self,
subscriptions: &[GossipsubSubscription],
propagation_source: &PeerId,
) {
debug!(
"Handling subscriptions: {:?}, from source: {}",
subscriptions,
propagation_source.to_string()
);
let mut unsubscribed_peers = Vec::new();
let subscribed_topics = match self.peer_topics.get_mut(propagation_source) {
Some(topics) => topics,
None => {
error!(
"Subscription by unknown peer: {}",
propagation_source.to_string()
);
return;
}
};
let mut grafts = Vec::new();
let mut application_event = Vec::new();
let filtered_topics = match self
.subscription_filter
.filter_incoming_subscriptions(subscriptions, subscribed_topics)
{
Ok(topics) => topics,
Err(s) => {
error!(
"Subscription filter error: {}; ignoring RPC from peer {}",
s,
propagation_source.to_string()
);
return;
}
};
for subscription in filtered_topics {
let peer_list = self
.topic_peers
.entry(subscription.topic_hash.clone())
.or_insert_with(Default::default);
match subscription.action {
GossipsubSubscriptionAction::Subscribe => {
if peer_list.insert(propagation_source.clone()) {
debug!(
"SUBSCRIPTION: Adding gossip peer: {} to topic: {:?}",
propagation_source.to_string(),
subscription.topic_hash
);
}
subscribed_topics.insert(subscription.topic_hash.clone());
if !self.explicit_peers.contains(propagation_source)
&& match self.peer_protocols.get(propagation_source) {
Some(PeerKind::Gossipsubv1_1) => true,
Some(PeerKind::Gossipsub) => true,
_ => false,
}
&& !Self::score_below_threshold_from_scores(
&self.peer_score,
propagation_source,
|_| 0.0,
)
.0
&& !self
.backoffs
.is_backoff_with_slack(&subscription.topic_hash, propagation_source)
{
if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) {
if peers.len() < self.config.mesh_n_low()
&& peers.insert(propagation_source.clone())
{
debug!(
"SUBSCRIPTION: Adding peer {} to the mesh for topic {:?}",
propagation_source.to_string(),
subscription.topic_hash
);
debug!(
"Sending GRAFT to peer {} for topic {:?}",
propagation_source.to_string(),
subscription.topic_hash
);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score
.graft(propagation_source, subscription.topic_hash.clone());
}
grafts.push(GossipsubControlAction::Graft {
topic_hash: subscription.topic_hash.clone(),
});
}
}
}
application_event.push(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Subscribed {
peer_id: propagation_source.clone(),
topic: subscription.topic_hash.clone(),
},
));
}
GossipsubSubscriptionAction::Unsubscribe => {
if peer_list.remove(propagation_source) {
info!(
"SUBSCRIPTION: Removing gossip peer: {} from topic: {:?}",
propagation_source.to_string(),
subscription.topic_hash
);
}
subscribed_topics.remove(&subscription.topic_hash);
unsubscribed_peers
.push((propagation_source.clone(), subscription.topic_hash.clone()));
application_event.push(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Unsubscribed {
peer_id: propagation_source.clone(),
topic: subscription.topic_hash.clone(),
},
));
}
}
}
for (peer_id, topic_hash) in unsubscribed_peers {
self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false);
}
if !grafts.is_empty()
&& self
.send_message(
propagation_source.clone(),
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: grafts,
}
.into_protobuf(),
)
.is_err()
{
error!("Failed sending grafts. Message too large");
}
for event in application_event {
self.events.push_back(event);
}
trace!(
"Completed handling subscriptions from source: {:?}",
propagation_source
);
}
fn apply_iwant_penalties(&mut self) {
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
for (peer, count) in gossip_promises.get_broken_promises() {
peer_score.add_penalty(&peer, count);
}
}
}
fn heartbeat(&mut self) {
debug!("Starting heartbeat");
self.heartbeat_ticks += 1;
let mut to_graft = HashMap::new();
let mut to_prune = HashMap::new();
let mut no_px = HashSet::new();
self.backoffs.heartbeat();
self.count_sent_iwant.clear();
self.count_received_ihave.clear();
self.apply_iwant_penalties();
if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
for p in self.explicit_peers.clone() {
self.check_explicit_peer_connection(&p);
}
}
let mut scores = HashMap::new();
let peer_score = &self.peer_score;
let mut score = |p: &PeerId| match peer_score {
Some((peer_score, ..)) => *scores
.entry(p.clone())
.or_insert_with(|| peer_score.score(p)),
_ => 0.0,
};
for (topic_hash, peers) in self.mesh.iter_mut() {
let explicit_peers = &self.explicit_peers;
let backoffs = &self.backoffs;
let topic_peers = &self.topic_peers;
let outbound_peers = &self.outbound_peers;
let to_remove: Vec<_> = peers
.iter()
.filter(|&p| {
if score(p) < 0.0 {
debug!(
"HEARTBEAT: Prune peer {:?} with negative score [score = {}, topic = \
{}]",
p,
score(p),
topic_hash
);
let current_topic = to_prune.entry(p.clone()).or_insert_with(Vec::new);
current_topic.push(topic_hash.clone());
no_px.insert(p.clone());
true
} else {
false
}
})
.cloned()
.collect();
for peer in to_remove {
peers.remove(&peer);
}
if peers.len() < self.config.mesh_n_low() {
debug!(
"HEARTBEAT: Mesh low. Topic: {} Contains: {} needs: {}",
topic_hash,
peers.len(),
self.config.mesh_n_low()
);
let desired_peers = self.config.mesh_n() - peers.len();
let peer_list = get_random_peers(
topic_peers,
&self.peer_protocols,
topic_hash,
desired_peers,
|peer| {
!peers.contains(peer)
&& !explicit_peers.contains(peer)
&& !backoffs.is_backoff_with_slack(topic_hash, peer)
&& score(peer) >= 0.0
},
);
for peer in &peer_list {
let current_topic = to_graft.entry(peer.clone()).or_insert_with(Vec::new);
current_topic.push(topic_hash.clone());
}
debug!("Updating mesh, new mesh: {:?}", peer_list);
peers.extend(peer_list);
}
if peers.len() > self.config.mesh_n_high() {
debug!(
"HEARTBEAT: Mesh high. Topic: {} Contains: {} needs: {}",
topic_hash,
peers.len(),
self.config.mesh_n_high()
);
let excess_peer_no = peers.len() - self.config.mesh_n();
let mut rng = thread_rng();
let mut shuffled = peers.iter().cloned().collect::<Vec<_>>();
shuffled.shuffle(&mut rng);
shuffled
.sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Ordering::Equal));
shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
let mut outbound = {
let outbound_peers = &self.outbound_peers;
shuffled
.iter()
.filter(|p| outbound_peers.contains(*p))
.count()
};
let mut removed = 0;
for peer in shuffled {
if removed == excess_peer_no {
break;
}
if self.outbound_peers.contains(&peer) {
if outbound <= self.config.mesh_outbound_min() {
continue;
} else {
outbound -= 1;
}
}
peers.remove(&peer);
let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
current_topic.push(topic_hash.clone());
removed += 1;
}
}
if peers.len() >= self.config.mesh_n_low() {
let outbound = { peers.iter().filter(|p| outbound_peers.contains(*p)).count() };
if outbound < self.config.mesh_outbound_min() {
let needed = self.config.mesh_outbound_min() - outbound;
let peer_list = get_random_peers(
topic_peers,
&self.peer_protocols,
topic_hash,
needed,
|peer| {
!peers.contains(peer)
&& !explicit_peers.contains(peer)
&& !backoffs.is_backoff_with_slack(topic_hash, peer)
&& score(peer) >= 0.0
&& outbound_peers.contains(peer)
},
);
for peer in &peer_list {
let current_topic = to_graft.entry(peer.clone()).or_insert_with(Vec::new);
current_topic.push(topic_hash.clone());
}
debug!("Updating mesh, new mesh: {:?}", peer_list);
peers.extend(peer_list);
}
}
if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
&& peers.len() > 1
&& self.peer_score.is_some()
{
if let Some((_, thresholds, _, _)) = &self.peer_score {
let mut peers_by_score: Vec<_> = peers.iter().collect();
peers_by_score
.sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Equal));
let middle = peers_by_score.len() / 2;
let median = if peers_by_score.len() % 2 == 0 {
(score(
*peers_by_score.get(middle - 1).expect(
"middle < vector length and middle > 0 since peers.len() > 0",
),
) + score(*peers_by_score.get(middle).expect("middle < vector length")))
* 0.5
} else {
score(*peers_by_score.get(middle).expect("middle < vector length"))
};
if median < thresholds.opportunistic_graft_threshold {
let peer_list = get_random_peers(
topic_peers,
&self.peer_protocols,
topic_hash,
self.config.opportunistic_graft_peers(),
|peer| {
!peers.contains(peer)
&& !explicit_peers.contains(peer)
&& !backoffs.is_backoff_with_slack(topic_hash, peer)
&& score(peer) > median
},
);
for peer in &peer_list {
let current_topic =
to_graft.entry(peer.clone()).or_insert_with(Vec::new);
current_topic.push(topic_hash.clone());
}
debug!(
"Opportunistically graft in topic {} with peers {:?}",
topic_hash, peer_list
);
peers.extend(peer_list);
}
}
}
}
{
let fanout = &mut self.fanout;
let fanout_ttl = self.config.fanout_ttl();
self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
if *last_pub_time + fanout_ttl < Instant::now() {
debug!(
"HEARTBEAT: Fanout topic removed due to timeout. Topic: {:?}",
topic_hash
);
fanout.remove(&topic_hash);
return false;
}
true
});
}
for (topic_hash, peers) in self.fanout.iter_mut() {
let mut to_remove_peers = Vec::new();
let publish_threshold = match &self.peer_score {
Some((_, thresholds, _, _)) => thresholds.publish_threshold,
_ => 0.0,
};
for peer in peers.iter() {
match self.peer_topics.get(peer) {
Some(topics) => {
if !topics.contains(&topic_hash) || score(peer) < publish_threshold {
debug!(
"HEARTBEAT: Peer removed from fanout for topic: {:?}",
topic_hash
);
to_remove_peers.push(peer.clone());
}
}
None => {
to_remove_peers.push(peer.clone());
}
}
}
for to_remove in to_remove_peers {
peers.remove(&to_remove);
}
if peers.len() < self.config.mesh_n() {
debug!(
"HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
peers.len(),
self.config.mesh_n()
);
let needed_peers = self.config.mesh_n() - peers.len();
let explicit_peers = &self.explicit_peers;
let new_peers = get_random_peers(
&self.topic_peers,
&self.peer_protocols,
topic_hash,
needed_peers,
|peer| {
!peers.contains(peer)
&& !explicit_peers.contains(peer)
&& score(peer) < publish_threshold
},
);
peers.extend(new_peers);
}
}
if self.peer_score.is_some() {
trace!("Peer_scores: {:?}", {
for peer in self.peer_topics.keys() {
score(peer);
}
scores
});
trace!("Mesh message deliveries: {:?}", {
self.mesh
.iter()
.map(|(t, peers)| {
(
t.clone(),
peers
.iter()
.map(|p| {
(
p.clone(),
peer_score
.as_ref()
.expect("peer_score.is_some()")
.0
.mesh_message_deliveries(p, t)
.unwrap_or(0.0),
)
})
.collect::<HashMap<PeerId, f64>>(),
)
})
.collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
})
}
self.emit_gossip();
if !to_graft.is_empty() | !to_prune.is_empty() {
self.send_graft_prune(to_graft, to_prune, no_px);
}
self.flush_control_pool();
self.mcache.shift();
debug!("Completed Heartbeat");
}
fn emit_gossip(&mut self) {
let mut rng = thread_rng();
for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
let mut message_ids = self.mcache.get_gossip_message_ids(&topic_hash);
if message_ids.is_empty() {
return;
}
if message_ids.len() > self.config.max_ihave_length() {
debug!(
"too many messages for gossip; will truncate IHAVE list ({} messages)",
message_ids.len()
);
} else {
message_ids.shuffle(&mut rng);
}
let n_map = |m| {
max(
self.config.gossip_lazy(),
(self.config.gossip_factor() * m as f64) as usize,
)
};
let to_msg_peers = get_random_peers_dynamic(
&self.topic_peers,
&self.peer_protocols,
&topic_hash,
n_map,
|peer| {
!peers.contains(peer)
&& !self.explicit_peers.contains(peer)
&& !self.score_below_threshold(peer, |ts| ts.gossip_threshold).0
},
);
debug!("Gossiping IHAVE to {} peers.", to_msg_peers.len());
for peer in to_msg_peers {
let mut peer_message_ids = message_ids.clone();
if peer_message_ids.len() > self.config.max_ihave_length() {
peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
peer_message_ids.truncate(self.config.max_ihave_length());
}
Self::control_pool_add(
&mut self.control_pool,
peer.clone(),
GossipsubControlAction::IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
},
);
}
}
}
fn send_graft_prune(
&mut self,
to_graft: HashMap<PeerId, Vec<TopicHash>>,
mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
no_px: HashSet<PeerId>,
) {
for (peer, topics) in to_graft.iter() {
for topic in topics {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.graft(peer, topic.clone());
}
}
let mut control_msgs: Vec<GossipsubControlAction> = topics
.iter()
.map(|topic_hash| GossipsubControlAction::Graft {
topic_hash: topic_hash.clone(),
})
.collect();
if let Some(topics) = to_prune.remove(peer) {
let mut prunes = topics
.iter()
.map(|topic_hash| {
self.make_prune(
topic_hash,
peer,
self.config.do_px() && !no_px.contains(peer),
)
})
.collect::<Vec<_>>();
control_msgs.append(&mut prunes);
}
if self
.send_message(
peer.clone(),
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs,
}
.into_protobuf(),
)
.is_err()
{
error!("Failed to send control messages. Message too large");
}
}
for (peer, topics) in to_prune.iter() {
let remaining_prunes = topics
.iter()
.map(|topic_hash| {
self.make_prune(
topic_hash,
peer,
self.config.do_px() && !no_px.contains(peer),
)
})
.collect();
if self
.send_message(
peer.clone(),
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: remaining_prunes,
}
.into_protobuf(),
)
.is_err()
{
error!("Failed to send prune messages. Message too large");
}
}
}
fn forward_msg(
&mut self,
msg_id: &MessageId,
message: RawGossipsubMessage,
propagation_source: Option<&PeerId>,
) -> Result<bool, PublishError> {
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some(peer) = propagation_source {
peer_score.deliver_message(peer, msg_id, &message.topic);
}
}
debug!("Forwarding message: {:?}", msg_id);
let mut recipient_peers = HashSet::new();
let topic = &message.topic;
if let Some(mesh_peers) = self.mesh.get(&topic) {
for peer_id in mesh_peers {
if Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref() {
recipient_peers.insert(peer_id.clone());
}
}
}
for p in &self.explicit_peers {
if let Some(topics) = self.peer_topics.get(p) {
if Some(p) != propagation_source
&& Some(p) != message.source.as_ref()
&& topics.contains(&message.topic)
{
recipient_peers.insert(p.clone());
}
}
}
if !recipient_peers.is_empty() {
let event = Arc::new(
GossipsubRpc {
subscriptions: Vec::new(),
messages: vec![RawGossipsubMessage::from(message.clone())],
control_msgs: Vec::new(),
}
.into_protobuf(),
);
for peer in recipient_peers.iter() {
debug!("Sending message: {:?} to peer {:?}", msg_id, peer);
self.send_message(peer.clone(), event.clone())?;
}
debug!("Completed forwarding message");
Ok(true)
} else {
Ok(false)
}
}
pub(crate) fn build_raw_message(
&self,
topic: TopicHash,
data: Vec<u8>,
) -> Result<RawGossipsubMessage, PublishError> {
match &self.publish_config {
PublishConfig::Signing {
ref keypair,
author,
inline_key,
} => {
let sequence_number: u64 = rand::random();
let signature = {
let message = rpc_proto::Message {
from: Some(author.clone().to_bytes()),
data: Some(data.clone()),
seqno: Some(sequence_number.to_be_bytes().to_vec()),
topic: topic.clone().into_string(),
signature: None,
key: None,
};
let mut buf = Vec::with_capacity(message.encoded_len());
message
.encode(&mut buf)
.expect("Buffer has sufficient capacity");
let mut signature_bytes = SIGNING_PREFIX.to_vec();
signature_bytes.extend_from_slice(&buf);
Some(keypair.sign(&signature_bytes)?)
};
Ok(RawGossipsubMessage {
source: Some(author.clone()),
data,
sequence_number: Some(sequence_number),
topic,
signature,
key: inline_key.clone(),
validated: true,
})
}
PublishConfig::Author(peer_id) => {
Ok(RawGossipsubMessage {
source: Some(peer_id.clone()),
data,
sequence_number: Some(rand::random()),
topic,
signature: None,
key: None,
validated: true,
})
}
PublishConfig::RandomAuthor => {
Ok(RawGossipsubMessage {
source: Some(PeerId::random()),
data,
sequence_number: Some(rand::random()),
topic,
signature: None,
key: None,
validated: true,
})
}
PublishConfig::Anonymous => {
Ok(RawGossipsubMessage {
source: None,
data,
sequence_number: None,
topic,
signature: None,
key: None,
validated: true,
})
}
}
}
fn control_pool_add(
control_pool: &mut HashMap<PeerId, Vec<GossipsubControlAction>>,
peer: PeerId,
control: GossipsubControlAction,
) {
control_pool
.entry(peer)
.or_insert_with(Vec::new)
.push(control);
}
fn flush_control_pool(&mut self) {
for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
if self
.send_message(
peer,
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: controls,
}
.into_protobuf(),
)
.is_err()
{
error!("Failed to flush control pool. Message too large");
}
}
}
fn send_message(
&mut self,
peer_id: PeerId,
message: impl Into<Arc<rpc_proto::Rpc>>,
) -> Result<(), PublishError> {
let messages = self.fragment_message(message.into())?;
for message in messages {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
event: message,
handler: NotifyHandler::Any,
})
}
Ok(())
}
fn fragment_message(
&self,
rpc: Arc<rpc_proto::Rpc>,
) -> Result<Vec<Arc<rpc_proto::Rpc>>, PublishError> {
if rpc.encoded_len() < self.config.max_transmit_size() {
return Ok(vec![rpc]);
}
let new_rpc = rpc_proto::Rpc {
subscriptions: Vec::new(),
publish: Vec::new(),
control: None,
};
let mut rpc_list = vec![new_rpc.clone()];
macro_rules! create_or_add_rpc {
($object_size: ident ) => {
let list_index = rpc_list.len() - 1;
if rpc_list[list_index].encoded_len() + (($object_size as f64) * 1.05) as usize
> self.config.max_transmit_size()
&& rpc_list[list_index] != new_rpc
{
rpc_list.push(new_rpc.clone());
}
};
};
macro_rules! add_item {
($object: ident, $type: ident ) => {
let object_size = $object.encoded_len();
if object_size + 2 > self.config.max_transmit_size() {
error!("Individual message too large to fragment");
return Err(PublishError::MessageTooLarge);
}
create_or_add_rpc!(object_size);
rpc_list
.last_mut()
.expect("Must have at least one element")
.$type
.push($object.clone());
};
}
for message in &rpc.publish {
add_item!(message, publish);
}
for subscription in &rpc.subscriptions {
add_item!(subscription, subscriptions);
}
let empty_control = rpc_proto::ControlMessage::default();
if let Some(control) = rpc.control.as_ref() {
if control.encoded_len() + 2 > self.config.max_transmit_size() {
for ihave in &control.ihave {
let len = ihave.encoded_len();
create_or_add_rpc!(len);
rpc_list
.last_mut()
.expect("Always an element")
.control
.get_or_insert_with(|| empty_control.clone())
.ihave
.push(ihave.clone());
}
for iwant in &control.iwant {
let len = iwant.encoded_len();
create_or_add_rpc!(len);
rpc_list
.last_mut()
.expect("Always an element")
.control
.get_or_insert_with(|| empty_control.clone())
.iwant
.push(iwant.clone());
}
for graft in &control.graft {
let len = graft.encoded_len();
create_or_add_rpc!(len);
rpc_list
.last_mut()
.expect("Always an element")
.control
.get_or_insert_with(|| empty_control.clone())
.graft
.push(graft.clone());
}
for prune in &control.prune {
let len = prune.encoded_len();
create_or_add_rpc!(len);
rpc_list
.last_mut()
.expect("Always an element")
.control
.get_or_insert_with(|| empty_control.clone())
.prune
.push(prune.clone());
}
} else {
let len = control.encoded_len();
create_or_add_rpc!(len);
rpc_list.last_mut().expect("Always an element").control = Some(control.clone());
}
}
Ok(rpc_list.into_iter().map(Arc::new).collect())
}
}
fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
addr.iter().find_map(|p| match p {
Ip4(addr) => Some(IpAddr::V4(addr)),
Ip6(addr) => Some(IpAddr::V6(addr)),
_ => None,
})
}
impl<C, F> NetworkBehaviour for Gossipsub<C, F>
where
C: Send + 'static + DataTransform,
F: Send + 'static + TopicSubscriptionFilter,
{
type ProtocolsHandler = GossipsubHandler;
type OutEvent = GossipsubEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
GossipsubHandler::new(
self.config.protocol_id_prefix().clone(),
self.config.max_transmit_size(),
self.config.validation_mode().clone(),
self.config.support_floodsub(),
)
}
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, peer_id: &PeerId) {
if self.blacklisted_peers.contains(peer_id) {
debug!("Ignoring connection from blacklisted peer: {}", peer_id);
return;
}
info!("New peer connected: {}", peer_id);
let mut subscriptions = vec![];
for topic_hash in self.mesh.keys() {
subscriptions.push(GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
});
}
if !subscriptions.is_empty() {
if self
.send_message(
peer_id.clone(),
GossipsubRpc {
messages: Vec::new(),
subscriptions,
control_msgs: Vec::new(),
}
.into_protobuf(),
)
.is_err()
{
error!("Failed to send subscriptions, message too large");
}
}
self.peer_topics.insert(peer_id.clone(), Default::default());
self.peer_protocols
.entry(peer_id.clone())
.or_insert(PeerKind::Floodsub);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.add_peer(peer_id.clone());
}
}
fn inject_disconnected(&mut self, peer_id: &PeerId) {
debug!("Peer disconnected: {}", peer_id);
{
let topics = match self.peer_topics.get(peer_id) {
Some(topics) => (topics),
None => {
if !self.blacklisted_peers.contains(peer_id) {
debug!("Disconnected node, not in connected nodes");
}
return;
}
};
for topic in topics {
if let Some(mesh_peers) = self.mesh.get_mut(&topic) {
mesh_peers.remove(peer_id);
}
if let Some(peer_list) = self.topic_peers.get_mut(&topic) {
if !peer_list.remove(peer_id) {
warn!(
"Disconnected node: {} not in topic_peers peer list",
peer_id
);
}
} else {
warn!(
"Disconnected node: {} with topic: {:?} not in topic_peers",
&peer_id, &topic
);
}
self.fanout
.get_mut(&topic)
.map(|peers| peers.remove(peer_id));
}
self.px_peers.remove(peer_id);
self.outbound_peers.remove(peer_id);
}
self.peer_topics.remove(peer_id);
self.peer_protocols.remove(peer_id);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.remove_peer(peer_id);
}
}
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
_: &ConnectionId,
endpoint: &ConnectedPoint,
) {
if self.blacklisted_peers.contains(peer_id) {
return;
}
if let ConnectedPoint::Dialer { .. } = endpoint {
if !self.peer_topics.contains_key(peer_id) && !self.px_peers.contains(peer_id) {
self.outbound_peers.insert(peer_id.clone());
}
}
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
peer_score.add_ip(&peer_id, ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
peer_id,
endpoint
)
}
}
}
fn inject_connection_closed(
&mut self,
peer: &PeerId,
_: &ConnectionId,
endpoint: &ConnectedPoint,
) {
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
peer_score.remove_ip(peer, &ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
peer,
endpoint
)
}
}
}
fn inject_address_change(
&mut self,
peer: &PeerId,
_: &ConnectionId,
endpoint_old: &ConnectedPoint,
endpoint_new: &ConnectedPoint,
) {
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
peer_score.remove_ip(peer, &ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
peer,
endpoint_old
)
}
if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
peer_score.add_ip(&peer, ip);
} else {
trace!(
"Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
peer,
endpoint_new
)
}
}
}
fn inject_event(
&mut self,
propagation_source: PeerId,
_: ConnectionId,
handler_event: HandlerEvent,
) {
match handler_event {
HandlerEvent::PeerKind(kind) => {
if let PeerKind::NotSupported = kind {
debug!(
"Peer does not support gossipsub protocols. {}",
propagation_source
);
self.inject_disconnected(&propagation_source);
} else if let Some(old_kind) = self.peer_protocols.get_mut(&propagation_source) {
debug!(
"New peer type found: {} for peer: {}",
kind, propagation_source
);
if let PeerKind::Floodsub = *old_kind {
*old_kind = kind;
}
}
}
HandlerEvent::Message {
rpc,
invalid_messages,
} => {
if !rpc.subscriptions.is_empty() {
self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
}
if let (true, _) =
self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold)
{
debug!("RPC Dropped from greylisted peer {}", propagation_source);
return;
}
if self.peer_score.is_some() {
for (raw_message, validation_error) in invalid_messages {
self.handle_invalid_message(
&propagation_source,
raw_message,
validation_error,
)
}
} else {
for (message, validation_error) in invalid_messages {
warn!(
"Invalid message. Reason: {:?} propagation_peer {} source {:?}",
validation_error,
propagation_source.to_string(),
message.source
);
}
}
for (count, raw_message) in rpc.messages.into_iter().enumerate() {
if self.config.max_messages_per_rpc().is_some()
&& Some(count) >= self.config.max_messages_per_rpc()
{
warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
break;
}
self.handle_received_message(raw_message, &propagation_source);
}
let mut ihave_msgs = vec![];
let mut graft_msgs = vec![];
let mut prune_msgs = vec![];
for control_msg in rpc.control_msgs {
match control_msg {
GossipsubControlAction::IHave {
topic_hash,
message_ids,
} => {
ihave_msgs.push((topic_hash, message_ids));
}
GossipsubControlAction::IWant { message_ids } => {
self.handle_iwant(&propagation_source, message_ids)
}
GossipsubControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash),
GossipsubControlAction::Prune {
topic_hash,
peers,
backoff,
} => prune_msgs.push((topic_hash, peers, backoff)),
}
}
if !ihave_msgs.is_empty() {
self.handle_ihave(&propagation_source, ihave_msgs);
}
if !graft_msgs.is_empty() {
self.handle_graft(&propagation_source, graft_msgs);
}
if !prune_msgs.is_empty() {
self.handle_prune(&propagation_source, prune_msgs);
}
}
}
}
fn poll(
&mut self,
cx: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(match event {
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event: send_event,
} => {
let event = Arc::try_unwrap(send_event).unwrap_or_else(|e| (*e).clone());
NetworkBehaviourAction::NotifyHandler {
peer_id,
event,
handler,
}
}
NetworkBehaviourAction::GenerateEvent(e) => {
NetworkBehaviourAction::GenerateEvent(e)
}
NetworkBehaviourAction::DialAddress { address } => {
NetworkBehaviourAction::DialAddress { address }
}
NetworkBehaviourAction::DialPeer { peer_id, condition } => {
NetworkBehaviourAction::DialPeer { peer_id, condition }
}
NetworkBehaviourAction::ReportObservedAddr { address, score } => {
NetworkBehaviourAction::ReportObservedAddr { address, score }
}
});
}
if let Some((peer_score, _, interval, _)) = &mut self.peer_score {
while let Poll::Ready(Some(())) = interval.poll_next_unpin(cx) {
peer_score.refresh_scores();
}
}
while let Poll::Ready(Some(())) = self.heartbeat.poll_next_unpin(cx) {
self.heartbeat();
}
Poll::Pending
}
}
fn get_random_peers_dynamic(
topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
peer_protocols: &HashMap<PeerId, PeerKind>,
topic_hash: &TopicHash,
n_map: impl Fn(usize) -> usize,
mut f: impl FnMut(&PeerId) -> bool,
) -> BTreeSet<PeerId> {
let mut gossip_peers = match topic_peers.get(topic_hash) {
Some(peer_list) => peer_list
.iter()
.cloned()
.filter(|p| {
f(p) && match peer_protocols.get(p) {
Some(PeerKind::Gossipsub) => true,
Some(PeerKind::Gossipsubv1_1) => true,
_ => false,
}
})
.collect(),
None => Vec::new(),
};
let n = n_map(gossip_peers.len());
if gossip_peers.len() <= n {
debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
return gossip_peers.into_iter().collect();
}
let mut rng = thread_rng();
gossip_peers.partial_shuffle(&mut rng, n);
debug!("RANDOM PEERS: Got {:?} peers", n);
gossip_peers.into_iter().take(n).collect()
}
fn get_random_peers(
topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
peer_protocols: &HashMap<PeerId, PeerKind>,
topic_hash: &TopicHash,
n: usize,
f: impl FnMut(&PeerId) -> bool,
) -> BTreeSet<PeerId> {
get_random_peers_dynamic(topic_peers, peer_protocols, topic_hash, |_| n, f)
}
fn validate_config(
authenticity: &MessageAuthenticity,
validation_mode: &ValidationMode,
) -> Result<(), &'static str> {
match validation_mode {
ValidationMode::Anonymous => {
if authenticity.is_signing() {
return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
}
if !authenticity.is_anonymous() {
return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
}
}
ValidationMode::Strict => {
if !authenticity.is_signing() {
return Err(
"Messages will be
published unsigned and incoming unsigned messages will be rejected. Consider adjusting
the validation or privacy settings in the config"
);
}
}
_ => {}
}
Ok(())
}
impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Gossipsub<C, F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Gossipsub")
.field("config", &self.config)
.field("events", &self.events)
.field("control_pool", &self.control_pool)
.field("publish_config", &self.publish_config)
.field("topic_peers", &self.topic_peers)
.field("peer_topics", &self.peer_topics)
.field("mesh", &self.mesh)
.field("fanout", &self.fanout)
.field("fanout_last_pub", &self.fanout_last_pub)
.field("mcache", &self.mcache)
.field("heartbeat", &self.heartbeat)
.finish()
}
}
impl fmt::Debug for PublishConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PublishConfig::Signing { author, .. } => {
f.write_fmt(format_args!("PublishConfig::Signing({})", author))
}
PublishConfig::Author(author) => {
f.write_fmt(format_args!("PublishConfig::Author({})", author))
}
PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
}
}
}
#[cfg(test)]
mod local_test {
use super::*;
use crate::IdentTopic;
use asynchronous_codec::Encoder;
use quickcheck::*;
use rand::Rng;
fn empty_rpc() -> GossipsubRpc {
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: Vec::new(),
}
}
fn test_message() -> RawGossipsubMessage {
RawGossipsubMessage {
source: Some(PeerId::random()),
data: vec![0; 100],
sequence_number: None,
topic: TopicHash::from_raw("test_topic"),
signature: None,
key: None,
validated: false,
}
}
fn test_subscription() -> GossipsubSubscription {
GossipsubSubscription {
action: GossipsubSubscriptionAction::Subscribe,
topic_hash: IdentTopic::new("TestTopic").hash(),
}
}
fn test_control() -> GossipsubControlAction {
GossipsubControlAction::IHave {
topic_hash: IdentTopic::new("TestTopic").hash(),
message_ids: vec![MessageId(vec![12u8]); 5],
}
}
impl Arbitrary for GossipsubRpc {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
let mut rpc = empty_rpc();
for _ in 0..g.gen_range(0, 10) {
rpc.subscriptions.push(test_subscription());
}
for _ in 0..g.gen_range(0, 10) {
rpc.messages.push(test_message());
}
for _ in 0..g.gen_range(0, 10) {
rpc.control_msgs.push(test_control());
}
rpc
}
}
#[test]
fn test_message_fragmentation_deterministic() {
let max_transmit_size = 500;
let config = crate::GossipsubConfigBuilder::default()
.max_transmit_size(max_transmit_size)
.validation_mode(ValidationMode::Permissive)
.build()
.unwrap();
let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap();
let mut rpc = empty_rpc();
rpc.messages.push(test_message());
let mut rpc_proto = rpc.clone().into_protobuf();
let fragmented_messages = gs.fragment_message(Arc::new(rpc_proto.clone())).unwrap();
assert_eq!(
fragmented_messages,
vec![Arc::new(rpc_proto.clone())],
"Messages under the limit shouldn't be fragmented"
);
while rpc_proto.encoded_len() < max_transmit_size {
rpc.messages.push(test_message());
rpc_proto = rpc.clone().into_protobuf();
}
let fragmented_messages = gs
.fragment_message(Arc::new(rpc_proto))
.expect("Should be able to fragment the messages");
assert!(
fragmented_messages.len() > 1,
"the message should be fragmented"
);
for message in fragmented_messages {
assert!(
message.encoded_len() < max_transmit_size,
"all messages should be less than the transmission size"
);
}
}
#[test]
fn test_message_fragmentation() {
fn prop(rpc: GossipsubRpc) {
let max_transmit_size = 500;
let config = crate::GossipsubConfigBuilder::default()
.max_transmit_size(max_transmit_size)
.validation_mode(ValidationMode::Permissive)
.build()
.unwrap();
let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap();
let mut length_codec = unsigned_varint::codec::UviBytes::default();
length_codec.set_max_len(max_transmit_size);
let mut codec =
crate::protocol::GossipsubCodec::new(length_codec, ValidationMode::Permissive);
let rpc_proto = rpc.into_protobuf();
let fragmented_messages = gs
.fragment_message(Arc::new(rpc_proto.clone()))
.expect("Messages must be valid");
if rpc_proto.encoded_len() < max_transmit_size {
assert_eq!(
fragmented_messages.len(),
1,
"the message should not be fragmented"
);
} else {
assert!(
fragmented_messages.len() > 1,
"the message should be fragmented"
);
}
for message in fragmented_messages {
assert!(
message.encoded_len() < max_transmit_size,
"all messages should be less than the transmission size: list size {} max size{}", message.encoded_len(), max_transmit_size
);
let mut buf = bytes::BytesMut::with_capacity(message.encoded_len());
codec
.encode(Arc::try_unwrap(message).unwrap(), &mut buf)
.unwrap()
}
}
QuickCheck::new()
.max_tests(100)
.quickcheck(prop as fn(_) -> _)
}
}