use crate::config::GossipsubConfig;
use crate::handler::GossipsubHandler;
use crate::mcache::MessageCache;
use crate::protocol::{
GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction,
MessageId,
};
use crate::topic::{Topic, TopicHash};
use futures::prelude::*;
use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId};
use libp2p_swarm::{
NetworkBehaviour,
NetworkBehaviourAction,
NotifyHandler,
PollParameters,
ProtocolsHandler
};
use log::{debug, error, info, trace, warn};
use lru::LruCache;
use rand;
use rand::{seq::SliceRandom, thread_rng};
use std::{
collections::hash_map::HashMap,
collections::HashSet,
collections::VecDeque,
iter,
sync::Arc,
task::{Context, Poll},
};
use wasm_timer::{Instant, Interval};
mod tests;
pub struct Gossipsub {
config: GossipsubConfig,
events: VecDeque<NetworkBehaviourAction<Arc<GossipsubRpc>, GossipsubEvent>>,
control_pool: HashMap<PeerId, Vec<GossipsubControlAction>>,
local_peer_id: PeerId,
topic_peers: HashMap<TopicHash, Vec<PeerId>>,
peer_topics: HashMap<PeerId, Vec<TopicHash>>,
mesh: HashMap<TopicHash, Vec<PeerId>>,
fanout: HashMap<TopicHash, Vec<PeerId>>,
fanout_last_pub: HashMap<TopicHash, Instant>,
mcache: MessageCache,
received: LruCache<MessageId, ()>,
heartbeat: Interval,
}
impl Gossipsub {
pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig) -> Self {
let local_peer_id = if gs_config.no_source_id {
PeerId::from_bytes(crate::config::IDENTITY_SOURCE.to_vec()).expect("Valid peer id")
} else {
local_peer_id
};
Gossipsub {
config: gs_config.clone(),
events: VecDeque::new(),
control_pool: HashMap::new(),
local_peer_id,
topic_peers: HashMap::new(),
peer_topics: HashMap::new(),
mesh: HashMap::new(),
fanout: HashMap::new(),
fanout_last_pub: HashMap::new(),
mcache: MessageCache::new(
gs_config.history_gossip,
gs_config.history_length,
gs_config.message_id_fn,
),
received: LruCache::new(256),
heartbeat: Interval::new_at(
Instant::now() + gs_config.heartbeat_initial_delay,
gs_config.heartbeat_interval,
),
}
}
pub fn subscribe(&mut self, topic: Topic) -> bool {
debug!("Subscribing to topic: {}", topic);
let topic_hash = self.topic_hash(topic.clone());
if self.mesh.get(&topic_hash).is_some() {
debug!("Topic: {} is already in the mesh.", topic);
return false;
}
if let Some(peer_list) = self.topic_peers.get(&topic_hash) {
let mut fixed_event = None;
if fixed_event.is_none() {
fixed_event = Some(Arc::new(GossipsubRpc {
messages: Vec::new(),
subscriptions: vec![GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
}],
control_msgs: Vec::new(),
}));
}
let event = fixed_event.expect("event has been initialised");
for peer in peer_list {
debug!("Sending SUBSCRIBE to peer: {:?}", peer);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.clone(),
handler: NotifyHandler::Any,
event: event.clone(),
});
}
}
self.join(&topic_hash);
info!("Subscribed to topic: {}", topic);
true
}
pub fn unsubscribe(&mut self, topic: Topic) -> bool {
debug!("Unsubscribing from topic: {}", topic);
let topic_hash = &self.topic_hash(topic);
if self.mesh.get(topic_hash).is_none() {
debug!("Already unsubscribed from topic: {:?}", topic_hash);
return false;
}
let mut fixed_event = None;
if let Some(peer_list) = self.topic_peers.get(topic_hash) {
if fixed_event.is_none() {
fixed_event = Some(Arc::new(GossipsubRpc {
messages: Vec::new(),
subscriptions: vec![GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Unsubscribe,
}],
control_msgs: Vec::new(),
}));
}
let event = fixed_event.expect("event has been initialised");
for peer in peer_list {
debug!("Sending UNSUBSCRIBE to peer: {:?}", peer);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.clone(),
event: event.clone(),
handler: NotifyHandler::Any,
});
}
}
self.leave(&topic_hash);
info!("Unsubscribed from topic: {:?}", topic_hash);
true
}
pub fn publish(&mut self, topic: &Topic, data: impl Into<Vec<u8>>) {
self.publish_many(iter::once(topic.clone()), data)
}
pub fn publish_many(
&mut self,
topic: impl IntoIterator<Item = Topic>,
data: impl Into<Vec<u8>>,
) {
let message = GossipsubMessage {
source: self.local_peer_id.clone(),
data: data.into(),
sequence_number: rand::random(),
topics: topic.into_iter().map(|t| self.topic_hash(t)).collect(),
};
debug!(
"Publishing message: {:?}",
(self.config.message_id_fn)(&message)
);
let local_peer_id = self.local_peer_id.clone();
self.forward_msg(message.clone(), &local_peer_id);
let mut recipient_peers = HashSet::new();
for topic_hash in &message.topics {
if self.mesh.get(&topic_hash).is_none() {
debug!("Topic: {:?} not in the mesh", topic_hash);
if self.fanout.contains_key(&topic_hash) {
for peer in self.fanout.get(&topic_hash).expect("Topic must exist") {
recipient_peers.insert(peer.clone());
}
} else {
let mesh_n = self.config.mesh_n;
let new_peers =
Self::get_random_peers(&self.topic_peers, &topic_hash, mesh_n, {
|_| true
});
self.fanout.insert(topic_hash.clone(), new_peers.clone());
for peer in new_peers {
debug!("Peer added to fanout: {:?}", peer);
recipient_peers.insert(peer.clone());
}
}
self.fanout_last_pub
.insert(topic_hash.clone(), Instant::now());
}
}
let msg_id = (self.config.message_id_fn)(&message);
self.mcache.put(message.clone());
self.received.put(msg_id.clone(), ());
info!("Published message: {:?}", msg_id);
let event = Arc::new(GossipsubRpc {
subscriptions: Vec::new(),
messages: vec![message],
control_msgs: Vec::new(),
});
for peer_id in recipient_peers.iter() {
debug!("Sending message to peer: {:?}", peer_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
event: event.clone(),
handler: NotifyHandler::Any,
});
}
}
pub fn propagate_message(
&mut self,
message_id: &MessageId,
propagation_source: &PeerId,
) -> bool {
let message = match self.mcache.get(message_id) {
Some(message) => message.clone(),
None => {
warn!(
"Message not in cache. Ignoring forwarding. Message Id: {}",
message_id.0
);
return false;
}
};
self.forward_msg(message, propagation_source);
true
}
fn join(&mut self, topic_hash: &TopicHash) {
debug!("Running JOIN for topic: {:?}", topic_hash);
if self.mesh.contains_key(topic_hash) {
info!("JOIN: The topic is already in the mesh, ignoring JOIN");
return;
}
let mut added_peers = vec![];
if let Some((_, peers)) = self.fanout.remove_entry(topic_hash) {
debug!(
"JOIN: Removing peers from the fanout for topic: {:?}",
topic_hash
);
let add_peers = std::cmp::min(peers.len(), self.config.mesh_n);
debug!(
"JOIN: Adding {:?} peers from the fanout for topic: {:?}",
add_peers, topic_hash
);
added_peers.extend_from_slice(&peers[..add_peers]);
self.mesh
.insert(topic_hash.clone(), peers[..add_peers].to_vec());
self.fanout_last_pub.remove(topic_hash);
}
if added_peers.len() < self.config.mesh_n {
let new_peers = Self::get_random_peers(
&self.topic_peers,
topic_hash,
self.config.mesh_n - added_peers.len(),
|_| true,
);
added_peers.extend_from_slice(&new_peers);
debug!(
"JOIN: Inserting {:?} random peers into the mesh",
new_peers.len()
);
let mesh_peers = self
.mesh
.entry(topic_hash.clone())
.or_insert_with(|| Vec::new());
mesh_peers.extend_from_slice(&new_peers);
}
for peer_id in added_peers {
info!("JOIN: Sending Graft message to peer: {:?}", peer_id);
Self::control_pool_add(
&mut self.control_pool,
peer_id.clone(),
GossipsubControlAction::Graft {
topic_hash: topic_hash.clone(),
},
);
}
debug!("Completed JOIN for topic: {:?}", topic_hash);
}
fn leave(&mut self, topic_hash: &TopicHash) {
debug!("Running LEAVE for topic {:?}", topic_hash);
if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
for peer in peers {
info!("LEAVE: Sending PRUNE to peer: {:?}", peer);
Self::control_pool_add(
&mut self.control_pool,
peer.clone(),
GossipsubControlAction::Prune {
topic_hash: topic_hash.clone(),
},
);
}
}
debug!("Completed LEAVE for topic: {:?}", topic_hash);
}
fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
debug!("Handling IHAVE for peer: {:?}", peer_id);
let mut iwant_ids = HashSet::new();
for (topic, ids) in ihave_msgs {
if !self.mesh.contains_key(&topic) {
debug!(
"IHAVE: Ignoring IHAVE - Not subscribed to topic: {:?}",
topic
);
continue;
}
for id in ids {
if !self.received.contains(&id) {
iwant_ids.insert(id);
}
}
}
if !iwant_ids.is_empty() {
debug!("IHAVE: Sending IWANT message");
Self::control_pool_add(
&mut self.control_pool,
peer_id.clone(),
GossipsubControlAction::IWant {
message_ids: iwant_ids.iter().cloned().collect(),
},
);
}
debug!("Completed IHAVE handling for peer: {:?}", peer_id);
}
fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
debug!("Handling IWANT for peer: {:?}", peer_id);
let mut cached_messages = HashMap::new();
for id in iwant_msgs {
if let Some(msg) = self.mcache.get(&id) {
cached_messages.insert(id.clone(), msg.clone());
}
}
if !cached_messages.is_empty() {
debug!("IWANT: Sending cached messages to peer: {:?}", peer_id);
let message_list = cached_messages.into_iter().map(|entry| entry.1).collect();
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::Any,
event: Arc::new(GossipsubRpc {
subscriptions: Vec::new(),
messages: message_list,
control_msgs: Vec::new(),
}),
});
}
debug!("Completed IWANT handling for peer: {:?}", peer_id);
}
fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
debug!("Handling GRAFT message for peer: {:?}", peer_id);
let mut to_prune_topics = HashSet::new();
for topic_hash in topics {
if let Some(peers) = self.mesh.get_mut(&topic_hash) {
info!(
"GRAFT: Mesh link added for peer: {:?} in topic: {:?}",
peer_id, topic_hash
);
if !peers.contains(peer_id) {
peers.push(peer_id.clone());
}
} else {
to_prune_topics.insert(topic_hash.clone());
}
}
if !to_prune_topics.is_empty() {
let prune_messages = to_prune_topics
.iter()
.map(|t| GossipsubControlAction::Prune {
topic_hash: t.clone(),
})
.collect();
info!(
"GRAFT: Not subscribed to topics - Sending PRUNE to peer: {:?}",
peer_id
);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::Any,
event: Arc::new(GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: prune_messages,
}),
});
}
debug!("Completed GRAFT handling for peer: {:?}", peer_id);
}
fn handle_prune(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
debug!("Handling PRUNE message for peer: {:?}", peer_id);
for topic_hash in topics {
if let Some(peers) = self.mesh.get_mut(&topic_hash) {
info!(
"PRUNE: Removing peer: {:?} from the mesh for topic: {:?}",
peer_id, topic_hash
);
peers.retain(|p| p != peer_id);
}
}
debug!("Completed PRUNE handling for peer: {:?}", peer_id);
}
fn handle_received_message(&mut self, msg: GossipsubMessage, propagation_source: &PeerId) {
let msg_id = (self.config.message_id_fn)(&msg);
debug!(
"Handling message: {:?} from peer: {:?}",
msg_id, propagation_source
);
if self.received.put(msg_id.clone(), ()).is_some() {
debug!("Message already received, ignoring. Message: {:?}", msg_id);
return;
}
self.mcache.put(msg.clone());
if self.mesh.keys().any(|t| msg.topics.iter().any(|u| t == u)) {
debug!("Sending received message to user");
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Message(propagation_source.clone(), msg_id, msg.clone()),
));
}
if !self.config.manual_propagation {
let message_id = (self.config.message_id_fn)(&msg);
self.forward_msg(msg, propagation_source);
debug!("Completed message handling for message: {:?}", message_id);
}
}
fn handle_received_subscriptions(
&mut self,
subscriptions: &[GossipsubSubscription],
propagation_source: &PeerId,
) {
debug!(
"Handling subscriptions: {:?}, from source: {:?}",
subscriptions, propagation_source
);
let subscribed_topics = match self.peer_topics.get_mut(propagation_source) {
Some(topics) => topics,
None => {
error!("Subscription by unknown peer: {:?}", &propagation_source);
return;
}
};
for subscription in subscriptions {
let peer_list = self
.topic_peers
.entry(subscription.topic_hash.clone())
.or_insert_with(Vec::new);
match subscription.action {
GossipsubSubscriptionAction::Subscribe => {
if !peer_list.contains(&propagation_source) {
debug!(
"SUBSCRIPTION: topic_peer: Adding gossip peer: {:?} to topic: {:?}",
propagation_source, subscription.topic_hash
);
peer_list.push(propagation_source.clone());
}
if !subscribed_topics.contains(&subscription.topic_hash) {
info!(
"SUBSCRIPTION: Adding peer: {:?} to topic: {:?}",
propagation_source, subscription.topic_hash
);
subscribed_topics.push(subscription.topic_hash.clone());
}
if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) {
if peers.len() < self.config.mesh_n_low {
debug!(
"SUBSCRIPTION: Adding peer {:?} to the mesh",
propagation_source,
);
}
peers.push(propagation_source.clone());
}
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Subscribed {
peer_id: propagation_source.clone(),
topic: subscription.topic_hash.clone(),
},
));
}
GossipsubSubscriptionAction::Unsubscribe => {
if let Some(pos) = peer_list.iter().position(|p| p == propagation_source) {
info!(
"SUBSCRIPTION: Removing gossip peer: {:?} from topic: {:?}",
propagation_source, subscription.topic_hash
);
peer_list.remove(pos);
}
if let Some(pos) = subscribed_topics
.iter()
.position(|t| t == &subscription.topic_hash)
{
subscribed_topics.remove(pos);
}
if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) {
peers.retain(|peer| peer != propagation_source);
}
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Unsubscribed {
peer_id: propagation_source.clone(),
topic: subscription.topic_hash.clone(),
},
));
}
}
}
trace!(
"Completed handling subscriptions from source: {:?}",
propagation_source
);
}
fn heartbeat(&mut self) {
debug!("Starting heartbeat");
let mut to_graft = HashMap::new();
let mut to_prune = HashMap::new();
for (topic_hash, peers) in self.mesh.iter_mut() {
if peers.len() < self.config.mesh_n_low {
debug!(
"HEARTBEAT: Mesh low. Topic: {:?} Contains: {:?} needs: {:?}",
topic_hash.clone().into_string(),
peers.len(),
self.config.mesh_n_low
);
let desired_peers = self.config.mesh_n - peers.len();
let peer_list =
Self::get_random_peers(&self.topic_peers, topic_hash, desired_peers, {
|peer| !peers.contains(peer)
});
for peer in &peer_list {
let current_topic = to_graft.entry(peer.clone()).or_insert_with(|| vec![]);
current_topic.push(topic_hash.clone());
}
debug!("Updating mesh, new mesh: {:?}", peer_list);
peers.extend(peer_list);
}
if peers.len() > self.config.mesh_n_high {
debug!(
"HEARTBEAT: Mesh high. Topic: {:?} Contains: {:?} needs: {:?}",
topic_hash,
peers.len(),
self.config.mesh_n_high
);
let excess_peer_no = peers.len() - self.config.mesh_n;
let mut rng = thread_rng();
peers.shuffle(&mut rng);
for _ in 0..excess_peer_no {
let peer = peers
.pop()
.expect("There should always be enough peers to remove");
let current_topic = to_prune.entry(peer).or_insert_with(|| vec![]);
current_topic.push(topic_hash.clone());
}
}
}
{
let fanout = &mut self.fanout;
let fanout_ttl = self.config.fanout_ttl;
self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
if *last_pub_time + fanout_ttl < Instant::now() {
debug!(
"HEARTBEAT: Fanout topic removed due to timeout. Topic: {:?}",
topic_hash
);
fanout.remove(&topic_hash);
return false;
}
true
});
}
for (topic_hash, peers) in self.fanout.iter_mut() {
let mut to_remove_peers = Vec::new();
for peer in peers.iter() {
match self.peer_topics.get(peer) {
Some(topics) => {
if !topics.contains(&topic_hash) {
debug!(
"HEARTBEAT: Peer removed from fanout for topic: {:?}",
topic_hash
);
to_remove_peers.push(peer.clone());
}
}
None => {
to_remove_peers.push(peer.clone());
}
}
}
peers.retain(|peer| to_remove_peers.contains(&peer));
if peers.len() < self.config.mesh_n {
debug!(
"HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
peers.len(),
self.config.mesh_n
);
let needed_peers = self.config.mesh_n - peers.len();
let new_peers =
Self::get_random_peers(&self.topic_peers, topic_hash, needed_peers, |peer| {
!peers.contains(peer)
});
peers.extend(new_peers);
}
}
self.emit_gossip();
if !to_graft.is_empty() | !to_prune.is_empty() {
self.send_graft_prune(to_graft, to_prune);
}
self.flush_control_pool();
self.mcache.shift();
debug!("Completed Heartbeat");
}
fn emit_gossip(&mut self) {
debug!("Started gossip");
for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
let message_ids = self.mcache.get_gossip_ids(&topic_hash);
if message_ids.is_empty() {
return;
}
let to_msg_peers = Self::get_random_peers(
&self.topic_peers,
&topic_hash,
self.config.gossip_lazy,
|peer| !peers.contains(peer),
);
for peer in to_msg_peers {
Self::control_pool_add(
&mut self.control_pool,
peer.clone(),
GossipsubControlAction::IHave {
topic_hash: topic_hash.clone(),
message_ids: message_ids.clone(),
},
);
}
}
debug!("Completed gossip");
}
fn send_graft_prune(
&mut self,
to_graft: HashMap<PeerId, Vec<TopicHash>>,
mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
) {
for (peer, topics) in to_graft.iter() {
let mut grafts: Vec<GossipsubControlAction> = topics
.iter()
.map(|topic_hash| GossipsubControlAction::Graft {
topic_hash: topic_hash.clone(),
})
.collect();
let mut prunes: Vec<GossipsubControlAction> = to_prune
.remove(peer)
.unwrap_or_else(|| vec![])
.iter()
.map(|topic_hash| GossipsubControlAction::Prune {
topic_hash: topic_hash.clone(),
})
.collect();
grafts.append(&mut prunes);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.clone(),
handler: NotifyHandler::Any,
event: Arc::new(GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: grafts,
}),
});
}
for (peer, topics) in to_prune.iter() {
let remaining_prunes = topics
.iter()
.map(|topic_hash| GossipsubControlAction::Prune {
topic_hash: topic_hash.clone(),
})
.collect();
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.clone(),
handler: NotifyHandler::Any,
event: Arc::new(GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: remaining_prunes,
}),
});
}
}
fn forward_msg(&mut self, message: GossipsubMessage, source: &PeerId) {
let msg_id = (self.config.message_id_fn)(&message);
debug!("Forwarding message: {:?}", msg_id);
let mut recipient_peers = HashSet::new();
for topic in &message.topics {
if let Some(mesh_peers) = self.mesh.get(&topic) {
for peer_id in mesh_peers {
if peer_id != source {
recipient_peers.insert(peer_id.clone());
}
}
}
}
if !recipient_peers.is_empty() {
let event = Arc::new(GossipsubRpc {
subscriptions: Vec::new(),
messages: vec![message.clone()],
control_msgs: Vec::new(),
});
for peer in recipient_peers.iter() {
debug!("Sending message: {:?} to peer {:?}", msg_id, peer);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.clone(),
event: event.clone(),
handler: NotifyHandler::Any,
});
}
}
debug!("Completed forwarding message");
}
fn get_random_peers(
topic_peers: &HashMap<TopicHash, Vec<PeerId>>,
topic_hash: &TopicHash,
n: usize,
mut f: impl FnMut(&PeerId) -> bool,
) -> Vec<PeerId> {
let mut gossip_peers = match topic_peers.get(topic_hash) {
Some(peer_list) => peer_list.iter().cloned().filter(|p| f(p)).collect(),
None => Vec::new(),
};
if gossip_peers.len() <= n {
debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
return gossip_peers.to_vec();
}
let mut rng = thread_rng();
gossip_peers.partial_shuffle(&mut rng, n);
debug!("RANDOM PEERS: Got {:?} peers", n);
gossip_peers[..n].to_vec()
}
fn control_pool_add(
control_pool: &mut HashMap<PeerId, Vec<GossipsubControlAction>>,
peer: PeerId,
control: GossipsubControlAction,
) {
control_pool
.entry(peer.clone())
.or_insert_with(Vec::new)
.push(control);
}
fn topic_hash(&self, topic: Topic) -> TopicHash {
if self.config.hash_topics {
topic.sha256_hash()
} else {
topic.no_hash()
}
}
fn flush_control_pool(&mut self) {
for (peer, controls) in self.control_pool.drain() {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer,
handler: NotifyHandler::Any,
event: Arc::new(GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: controls,
}),
});
}
}
}
impl NetworkBehaviour for Gossipsub {
type ProtocolsHandler = GossipsubHandler;
type OutEvent = GossipsubEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
GossipsubHandler::new(
self.config.protocol_id.clone(),
self.config.max_transmit_size,
)
}
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, id: &PeerId) {
info!("New peer connected: {:?}", id);
let mut subscriptions = vec![];
for topic_hash in self.mesh.keys() {
subscriptions.push(GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
});
}
if !subscriptions.is_empty() {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: id.clone(),
handler: NotifyHandler::Any,
event: Arc::new(GossipsubRpc {
messages: Vec::new(),
subscriptions,
control_msgs: Vec::new(),
}),
});
}
self.peer_topics.insert(id.clone(), Vec::new());
}
fn inject_disconnected(&mut self, id: &PeerId) {
debug!("Peer disconnected: {:?}", id);
{
let topics = match self.peer_topics.get(id) {
Some(topics) => (topics),
None => {
warn!("Disconnected node, not in connected nodes");
return;
}
};
for topic in topics {
if let Some(mesh_peers) = self.mesh.get_mut(&topic) {
if let Some(pos) = mesh_peers.iter().position(|p| p == id) {
mesh_peers.remove(pos);
}
}
if let Some(peer_list) = self.topic_peers.get_mut(&topic) {
if let Some(pos) = peer_list.iter().position(|p| p == id) {
peer_list.remove(pos);
}
else {
warn!("Disconnected node: {:?} not in topic_peers peer list", &id);
}
} else {
warn!(
"Disconnected node: {:?} with topic: {:?} not in topic_peers",
&id, &topic
);
}
self.fanout
.get_mut(&topic)
.map(|peers| peers.retain(|p| p != id));
}
}
let was_in = self.peer_topics.remove(id);
debug_assert!(was_in.is_some());
}
fn inject_event(&mut self, propagation_source: PeerId, _: ConnectionId, event: GossipsubRpc) {
self.handle_received_subscriptions(&event.subscriptions, &propagation_source);
for message in event.messages {
self.handle_received_message(message, &propagation_source);
}
let mut ihave_msgs = vec![];
let mut graft_msgs = vec![];
let mut prune_msgs = vec![];
for control_msg in event.control_msgs {
match control_msg {
GossipsubControlAction::IHave {
topic_hash,
message_ids,
} => {
ihave_msgs.push((topic_hash, message_ids));
}
GossipsubControlAction::IWant { message_ids } => {
self.handle_iwant(&propagation_source, message_ids)
}
GossipsubControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash),
GossipsubControlAction::Prune { topic_hash } => prune_msgs.push(topic_hash),
}
}
if !ihave_msgs.is_empty() {
self.handle_ihave(&propagation_source, ihave_msgs);
}
if !graft_msgs.is_empty() {
self.handle_graft(&propagation_source, graft_msgs);
}
if !prune_msgs.is_empty() {
self.handle_prune(&propagation_source, prune_msgs);
}
}
fn poll(
&mut self,
cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
if let Some(event) = self.events.pop_front() {
match event {
NetworkBehaviourAction::NotifyHandler {
peer_id, handler, event: send_event,
} => match Arc::try_unwrap(send_event) {
Ok(event) => {
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id, event, handler
});
}
Err(event) => {
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id, event: (*event).clone(), handler
});
}
},
NetworkBehaviourAction::GenerateEvent(e) => {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(e));
}
NetworkBehaviourAction::DialAddress { address } => {
return Poll::Ready(NetworkBehaviourAction::DialAddress { address });
}
NetworkBehaviourAction::DialPeer { peer_id, condition } => {
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition });
}
NetworkBehaviourAction::ReportObservedAddr { address } => {
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address });
}
}
}
while let Poll::Ready(Some(())) = self.heartbeat.poll_next_unpin(cx) {
self.heartbeat();
}
Poll::Pending
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct GossipsubRpc {
pub messages: Vec<GossipsubMessage>,
pub subscriptions: Vec<GossipsubSubscription>,
pub control_msgs: Vec<GossipsubControlAction>,
}
#[derive(Debug)]
pub enum GossipsubEvent {
Message(PeerId, MessageId, GossipsubMessage),
Subscribed {
peer_id: PeerId,
topic: TopicHash,
},
Unsubscribed {
peer_id: PeerId,
topic: TopicHash,
},
}