use crate::protocol::{FloodsubProtocol, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
use crate::topic::Topic;
use crate::FloodsubConfig;
use cuckoofilter::{CuckooError, CuckooFilter};
use fnv::FnvHashSet;
use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId};
use libp2p_swarm::{
NetworkBehaviour,
NetworkBehaviourAction,
PollParameters,
ProtocolsHandler,
OneShotHandler,
NotifyHandler,
DialPeerCondition,
};
use log::warn;
use rand;
use smallvec::SmallVec;
use std::{collections::VecDeque, iter};
use std::collections::hash_map::{DefaultHasher, HashMap};
use std::task::{Context, Poll};
pub struct Floodsub {
events: VecDeque<NetworkBehaviourAction<FloodsubRpc, FloodsubEvent>>,
config: FloodsubConfig,
target_peers: FnvHashSet<PeerId>,
connected_peers: HashMap<PeerId, SmallVec<[Topic; 8]>>,
subscribed_topics: SmallVec<[Topic; 16]>,
received: CuckooFilter<DefaultHasher>,
}
impl Floodsub {
pub fn new(local_peer_id: PeerId) -> Self {
Self::from_config(FloodsubConfig::new(local_peer_id))
}
pub fn from_config(config: FloodsubConfig) -> Self {
Floodsub {
events: VecDeque::new(),
config,
target_peers: FnvHashSet::default(),
connected_peers: HashMap::new(),
subscribed_topics: SmallVec::new(),
received: CuckooFilter::new(),
}
}
#[inline]
pub fn add_node_to_partial_view(&mut self, peer_id: PeerId) {
if self.connected_peers.contains_key(&peer_id) {
for topic in self.subscribed_topics.iter().cloned() {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::Any,
event: FloodsubRpc {
messages: Vec::new(),
subscriptions: vec![FloodsubSubscription {
topic,
action: FloodsubSubscriptionAction::Subscribe,
}],
},
});
}
}
if self.target_peers.insert(peer_id.clone()) {
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id, condition: DialPeerCondition::Disconnected
});
}
}
#[inline]
pub fn remove_node_from_partial_view(&mut self, peer_id: &PeerId) {
self.target_peers.remove(peer_id);
}
pub fn subscribe(&mut self, topic: Topic) -> bool {
if self.subscribed_topics.iter().any(|t| t.id() == topic.id()) {
return false;
}
for peer in self.connected_peers.keys() {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.clone(),
handler: NotifyHandler::Any,
event: FloodsubRpc {
messages: Vec::new(),
subscriptions: vec![FloodsubSubscription {
topic: topic.clone(),
action: FloodsubSubscriptionAction::Subscribe,
}],
},
});
}
self.subscribed_topics.push(topic);
true
}
pub fn unsubscribe(&mut self, topic: Topic) -> bool {
let pos = match self.subscribed_topics.iter().position(|t| *t == topic) {
Some(pos) => pos,
None => return false
};
self.subscribed_topics.remove(pos);
for peer in self.connected_peers.keys() {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.clone(),
handler: NotifyHandler::Any,
event: FloodsubRpc {
messages: Vec::new(),
subscriptions: vec![FloodsubSubscription {
topic: topic.clone(),
action: FloodsubSubscriptionAction::Unsubscribe,
}],
},
});
}
true
}
pub fn publish(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
self.publish_many(iter::once(topic), data)
}
pub fn publish_any(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
self.publish_many_any(iter::once(topic), data)
}
pub fn publish_many(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>) {
self.publish_many_inner(topic, data, true)
}
pub fn publish_many_any(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>) {
self.publish_many_inner(topic, data, false)
}
fn publish_many_inner(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>, check_self_subscriptions: bool) {
let message = FloodsubMessage {
source: self.config.local_peer_id.clone(),
data: data.into(),
sequence_number: rand::random::<[u8; 20]>().to_vec(),
topics: topic.into_iter().map(Into::into).collect(),
};
let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u));
if self_subscribed {
if let Err(e @ CuckooError::NotEnoughSpace) = self.received.add(&message) {
warn!(
"Message was added to 'received' Cuckoofilter but some \
other message was removed as a consequence: {}", e,
);
}
if self.config.subscribe_local_messages {
self.events.push_back(
NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Message(message.clone())));
}
}
if check_self_subscriptions && !self_subscribed {
return
}
for (peer_id, sub_topic) in self.connected_peers.iter() {
if !sub_topic.iter().any(|t| message.topics.iter().any(|u| t == u)) {
continue;
}
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::Any,
event: FloodsubRpc {
subscriptions: Vec::new(),
messages: vec![message.clone()],
}
});
}
}
}
impl NetworkBehaviour for Floodsub {
type ProtocolsHandler = OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>;
type OutEvent = FloodsubEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
Default::default()
}
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, id: &PeerId) {
if self.target_peers.contains(id) {
for topic in self.subscribed_topics.iter().cloned() {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: id.clone(),
handler: NotifyHandler::Any,
event: FloodsubRpc {
messages: Vec::new(),
subscriptions: vec![FloodsubSubscription {
topic,
action: FloodsubSubscriptionAction::Subscribe,
}],
},
});
}
}
self.connected_peers.insert(id.clone(), SmallVec::new());
}
fn inject_disconnected(&mut self, id: &PeerId) {
let was_in = self.connected_peers.remove(id);
debug_assert!(was_in.is_some());
if self.target_peers.contains(id) {
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: id.clone(),
condition: DialPeerCondition::Disconnected
});
}
}
fn inject_event(
&mut self,
propagation_source: PeerId,
_connection: ConnectionId,
event: InnerMessage,
) {
let event = match event {
InnerMessage::Rx(event) => event,
InnerMessage::Sent => return,
};
for subscription in event.subscriptions {
let remote_peer_topics = self.connected_peers
.get_mut(&propagation_source)
.expect("connected_peers is kept in sync with the peers we are connected to; we are guaranteed to only receive events from connected peers; QED");
match subscription.action {
FloodsubSubscriptionAction::Subscribe => {
if !remote_peer_topics.contains(&subscription.topic) {
remote_peer_topics.push(subscription.topic.clone());
}
self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Subscribed {
peer_id: propagation_source.clone(),
topic: subscription.topic,
}));
}
FloodsubSubscriptionAction::Unsubscribe => {
if let Some(pos) = remote_peer_topics.iter().position(|t| t == &subscription.topic ) {
remote_peer_topics.remove(pos);
}
self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Unsubscribed {
peer_id: propagation_source.clone(),
topic: subscription.topic,
}));
}
}
}
let mut rpcs_to_dispatch: Vec<(PeerId, FloodsubRpc)> = Vec::new();
for message in event.messages {
match self.received.test_and_add(&message) {
Ok(true) => {},
Ok(false) => continue,
Err(e @ CuckooError::NotEnoughSpace) => {
warn!(
"Message was added to 'received' Cuckoofilter but some \
other message was removed as a consequence: {}", e,
);
}
}
if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
let event = FloodsubEvent::Message(message.clone());
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
for (peer_id, subscr_topics) in self.connected_peers.iter() {
if peer_id == &propagation_source {
continue;
}
if !subscr_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
continue;
}
if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) {
rpcs_to_dispatch[pos].1.messages.push(message.clone());
} else {
rpcs_to_dispatch.push((peer_id.clone(), FloodsubRpc {
subscriptions: Vec::new(),
messages: vec![message.clone()],
}));
}
}
}
for (peer_id, rpc) in rpcs_to_dispatch {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: rpc,
});
}
}
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}
Poll::Pending
}
}
pub enum InnerMessage {
Rx(FloodsubRpc),
Sent,
}
impl From<FloodsubRpc> for InnerMessage {
#[inline]
fn from(rpc: FloodsubRpc) -> InnerMessage {
InnerMessage::Rx(rpc)
}
}
impl From<()> for InnerMessage {
#[inline]
fn from(_: ()) -> InnerMessage {
InnerMessage::Sent
}
}
#[derive(Debug)]
pub enum FloodsubEvent {
Message(FloodsubMessage),
Subscribed {
peer_id: PeerId,
topic: Topic,
},
Unsubscribed {
peer_id: PeerId,
topic: Topic,
},
}