libp2p_floodsub/
layer.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::{
23        hash_map::{DefaultHasher, HashMap},
24        VecDeque,
25    },
26    iter,
27    task::{Context, Poll},
28};
29
30use bytes::Bytes;
31use cuckoofilter::{CuckooError, CuckooFilter};
32use fnv::FnvHashSet;
33use libp2p_core::{transport::PortUse, Endpoint, Multiaddr};
34use libp2p_identity::PeerId;
35use libp2p_swarm::{
36    behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm},
37    dial_opts::DialOpts,
38    CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler,
39    OneShotHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
40};
41use smallvec::SmallVec;
42
43use crate::{
44    protocol::{
45        FloodsubMessage, FloodsubProtocol, FloodsubRpc, FloodsubSubscription,
46        FloodsubSubscriptionAction,
47    },
48    topic::Topic,
49    FloodsubConfig,
50};
51
52/// Network behaviour that handles the floodsub protocol.
53pub struct Floodsub {
54    /// Events that need to be yielded to the outside when polling.
55    events: VecDeque<ToSwarm<FloodsubEvent, FloodsubRpc>>,
56
57    config: FloodsubConfig,
58
59    /// List of peers to send messages to.
60    target_peers: FnvHashSet<PeerId>,
61
62    /// List of peers the network is connected to, and the topics that they're subscribed to.
63    // TODO: filter out peers that don't support floodsub, so that we avoid hammering them with
64    //       opened substreams
65    connected_peers: HashMap<PeerId, SmallVec<[Topic; 8]>>,
66
67    // List of topics we're subscribed to. Necessary to filter out messages that we receive
68    // erroneously.
69    subscribed_topics: SmallVec<[Topic; 16]>,
70
71    // We keep track of the messages we received (in the format `hash(source ID, seq_no)`) so that
72    // we don't dispatch the same message twice if we receive it twice on the network.
73    received: CuckooFilter<DefaultHasher>,
74}
75
76impl Floodsub {
77    /// Creates a `Floodsub` with default configuration.
78    pub fn new(local_peer_id: PeerId) -> Self {
79        Self::from_config(FloodsubConfig::new(local_peer_id))
80    }
81
82    /// Creates a `Floodsub` with the given configuration.
83    pub fn from_config(config: FloodsubConfig) -> Self {
84        Floodsub {
85            events: VecDeque::new(),
86            config,
87            target_peers: FnvHashSet::default(),
88            connected_peers: HashMap::new(),
89            subscribed_topics: SmallVec::new(),
90            received: CuckooFilter::new(),
91        }
92    }
93
94    /// Add a node to the list of nodes to propagate messages to.
95    #[inline]
96    pub fn add_node_to_partial_view(&mut self, peer_id: PeerId) {
97        // Send our topics to this node if we're already connected to it.
98        if self.connected_peers.contains_key(&peer_id) {
99            for topic in self.subscribed_topics.iter().cloned() {
100                self.events.push_back(ToSwarm::NotifyHandler {
101                    peer_id,
102                    handler: NotifyHandler::Any,
103                    event: FloodsubRpc {
104                        messages: Vec::new(),
105                        subscriptions: vec![FloodsubSubscription {
106                            topic,
107                            action: FloodsubSubscriptionAction::Subscribe,
108                        }],
109                    },
110                });
111            }
112        }
113
114        if self.target_peers.insert(peer_id) {
115            self.events.push_back(ToSwarm::Dial {
116                opts: DialOpts::peer_id(peer_id).build(),
117            });
118        }
119    }
120
121    /// Remove a node from the list of nodes to propagate messages to.
122    #[inline]
123    pub fn remove_node_from_partial_view(&mut self, peer_id: &PeerId) {
124        self.target_peers.remove(peer_id);
125    }
126
127    /// Subscribes to a topic.
128    ///
129    /// Returns true if the subscription worked. Returns false if we were already subscribed.
130    pub fn subscribe(&mut self, topic: Topic) -> bool {
131        if self.subscribed_topics.iter().any(|t| t.id() == topic.id()) {
132            return false;
133        }
134
135        for peer in self.connected_peers.keys() {
136            self.events.push_back(ToSwarm::NotifyHandler {
137                peer_id: *peer,
138                handler: NotifyHandler::Any,
139                event: FloodsubRpc {
140                    messages: Vec::new(),
141                    subscriptions: vec![FloodsubSubscription {
142                        topic: topic.clone(),
143                        action: FloodsubSubscriptionAction::Subscribe,
144                    }],
145                },
146            });
147        }
148
149        self.subscribed_topics.push(topic);
150        true
151    }
152
153    /// Unsubscribes from a topic.
154    ///
155    /// Note that this only requires the topic name.
156    ///
157    /// Returns true if we were subscribed to this topic.
158    pub fn unsubscribe(&mut self, topic: Topic) -> bool {
159        let Some(pos) = self.subscribed_topics.iter().position(|t| *t == topic) else {
160            return false;
161        };
162
163        self.subscribed_topics.remove(pos);
164
165        for peer in self.connected_peers.keys() {
166            self.events.push_back(ToSwarm::NotifyHandler {
167                peer_id: *peer,
168                handler: NotifyHandler::Any,
169                event: FloodsubRpc {
170                    messages: Vec::new(),
171                    subscriptions: vec![FloodsubSubscription {
172                        topic: topic.clone(),
173                        action: FloodsubSubscriptionAction::Unsubscribe,
174                    }],
175                },
176            });
177        }
178
179        true
180    }
181
182    /// Publishes a message to the network, if we're subscribed to the topic only.
183    pub fn publish(&mut self, topic: impl Into<Topic>, data: impl Into<Bytes>) {
184        self.publish_many(iter::once(topic), data)
185    }
186
187    /// Publishes a message to the network, even if we're not subscribed to the topic.
188    pub fn publish_any(&mut self, topic: impl Into<Topic>, data: impl Into<Bytes>) {
189        self.publish_many_any(iter::once(topic), data)
190    }
191
192    /// Publishes a message with multiple topics to the network.
193    ///
194    ///
195    /// > **Note**: Doesn't do anything if we're not subscribed to any of the topics.
196    pub fn publish_many(
197        &mut self,
198        topic: impl IntoIterator<Item = impl Into<Topic>>,
199        data: impl Into<Bytes>,
200    ) {
201        self.publish_many_inner(topic, data, true)
202    }
203
204    /// Publishes a message with multiple topics to the network, even if we're not subscribed to any
205    /// of the topics.
206    pub fn publish_many_any(
207        &mut self,
208        topic: impl IntoIterator<Item = impl Into<Topic>>,
209        data: impl Into<Bytes>,
210    ) {
211        self.publish_many_inner(topic, data, false)
212    }
213
214    fn publish_many_inner(
215        &mut self,
216        topic: impl IntoIterator<Item = impl Into<Topic>>,
217        data: impl Into<Bytes>,
218        check_self_subscriptions: bool,
219    ) {
220        let message = FloodsubMessage {
221            source: self.config.local_peer_id,
222            data: data.into(),
223            // If the sequence numbers are predictable, then an attacker could flood the network
224            // with packets with the predetermined sequence numbers and absorb our legitimate
225            // messages. We therefore use a random number.
226            sequence_number: rand::random::<[u8; 20]>().to_vec(),
227            topics: topic.into_iter().map(Into::into).collect(),
228        };
229
230        let self_subscribed = self
231            .subscribed_topics
232            .iter()
233            .any(|t| message.topics.iter().any(|u| t == u));
234        if self_subscribed {
235            if let Err(e @ CuckooError::NotEnoughSpace) = self.received.add(&message) {
236                tracing::warn!(
237                    "Message was added to 'received' Cuckoofilter but some \
238                     other message was removed as a consequence: {}",
239                    e,
240                );
241            }
242            if self.config.subscribe_local_messages {
243                self.events
244                    .push_back(ToSwarm::GenerateEvent(FloodsubEvent::Message(
245                        message.clone(),
246                    )));
247            }
248        }
249        // Don't publish the message if we have to check subscriptions
250        // and we're not subscribed ourselves to any of the topics.
251        if check_self_subscriptions && !self_subscribed {
252            return;
253        }
254
255        // Send to peers we know are subscribed to the topic.
256        for (peer_id, sub_topic) in self.connected_peers.iter() {
257            // Peer must be in a communication list.
258            if !self.target_peers.contains(peer_id) {
259                continue;
260            }
261
262            // Peer must be subscribed for the topic.
263            if !sub_topic
264                .iter()
265                .any(|t| message.topics.iter().any(|u| t == u))
266            {
267                continue;
268            }
269
270            self.events.push_back(ToSwarm::NotifyHandler {
271                peer_id: *peer_id,
272                handler: NotifyHandler::Any,
273                event: FloodsubRpc {
274                    subscriptions: Vec::new(),
275                    messages: vec![message.clone()],
276                },
277            });
278        }
279    }
280
281    fn on_connection_established(
282        &mut self,
283        ConnectionEstablished {
284            peer_id,
285            other_established,
286            ..
287        }: ConnectionEstablished,
288    ) {
289        if other_established > 0 {
290            // We only care about the first time a peer connects.
291            return;
292        }
293
294        // We need to send our subscriptions to the newly-connected node.
295        if self.target_peers.contains(&peer_id) {
296            for topic in self.subscribed_topics.iter().cloned() {
297                self.events.push_back(ToSwarm::NotifyHandler {
298                    peer_id,
299                    handler: NotifyHandler::Any,
300                    event: FloodsubRpc {
301                        messages: Vec::new(),
302                        subscriptions: vec![FloodsubSubscription {
303                            topic,
304                            action: FloodsubSubscriptionAction::Subscribe,
305                        }],
306                    },
307                });
308            }
309        }
310
311        self.connected_peers.insert(peer_id, SmallVec::new());
312    }
313
314    fn on_connection_closed(
315        &mut self,
316        ConnectionClosed {
317            peer_id,
318            remaining_established,
319            ..
320        }: ConnectionClosed,
321    ) {
322        if remaining_established > 0 {
323            // we only care about peer disconnections
324            return;
325        }
326
327        let was_in = self.connected_peers.remove(&peer_id);
328        debug_assert!(was_in.is_some());
329
330        // We can be disconnected by the remote in case of inactivity for example, so we always
331        // try to reconnect.
332        if self.target_peers.contains(&peer_id) {
333            self.events.push_back(ToSwarm::Dial {
334                opts: DialOpts::peer_id(peer_id).build(),
335            });
336        }
337    }
338}
339
340impl NetworkBehaviour for Floodsub {
341    type ConnectionHandler = OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>;
342    type ToSwarm = FloodsubEvent;
343
344    fn handle_established_inbound_connection(
345        &mut self,
346        _: ConnectionId,
347        _: PeerId,
348        _: &Multiaddr,
349        _: &Multiaddr,
350    ) -> Result<THandler<Self>, ConnectionDenied> {
351        Ok(Default::default())
352    }
353
354    fn handle_established_outbound_connection(
355        &mut self,
356        _: ConnectionId,
357        _: PeerId,
358        _: &Multiaddr,
359        _: Endpoint,
360        _: PortUse,
361    ) -> Result<THandler<Self>, ConnectionDenied> {
362        Ok(Default::default())
363    }
364
365    fn on_connection_handler_event(
366        &mut self,
367        propagation_source: PeerId,
368        connection_id: ConnectionId,
369        event: THandlerOutEvent<Self>,
370    ) {
371        // We ignore successful sends or timeouts.
372        let event = match event {
373            Ok(InnerMessage::Rx(event)) => event,
374            Ok(InnerMessage::Sent) => return,
375            Err(e) => {
376                tracing::debug!("Failed to send floodsub message: {e}");
377                self.events.push_back(ToSwarm::CloseConnection {
378                    peer_id: propagation_source,
379                    connection: CloseConnection::One(connection_id),
380                });
381                return;
382            }
383        };
384
385        // Update connected peers topics
386        for subscription in event.subscriptions {
387            let remote_peer_topics = self.connected_peers
388                .get_mut(&propagation_source)
389                .expect("connected_peers is kept in sync with the peers we are connected to; we are guaranteed to only receive events from connected peers; QED");
390            match subscription.action {
391                FloodsubSubscriptionAction::Subscribe => {
392                    if !remote_peer_topics.contains(&subscription.topic) {
393                        remote_peer_topics.push(subscription.topic.clone());
394                    }
395                    self.events
396                        .push_back(ToSwarm::GenerateEvent(FloodsubEvent::Subscribed {
397                            peer_id: propagation_source,
398                            topic: subscription.topic,
399                        }));
400                }
401                FloodsubSubscriptionAction::Unsubscribe => {
402                    if let Some(pos) = remote_peer_topics
403                        .iter()
404                        .position(|t| t == &subscription.topic)
405                    {
406                        remote_peer_topics.remove(pos);
407                    }
408                    self.events
409                        .push_back(ToSwarm::GenerateEvent(FloodsubEvent::Unsubscribed {
410                            peer_id: propagation_source,
411                            topic: subscription.topic,
412                        }));
413                }
414            }
415        }
416
417        // List of messages we're going to propagate on the network.
418        let mut rpcs_to_dispatch: Vec<(PeerId, FloodsubRpc)> = Vec::new();
419
420        for message in event.messages {
421            // Use `self.received` to skip the messages that we have already received in the past.
422            // Note that this can result in false positives.
423            match self.received.test_and_add(&message) {
424                Ok(true) => {}         // Message  was added.
425                Ok(false) => continue, // Message already existed.
426                Err(e @ CuckooError::NotEnoughSpace) => {
427                    // Message added, but some other removed.
428                    tracing::warn!(
429                        "Message was added to 'received' Cuckoofilter but some \
430                         other message was removed as a consequence: {}",
431                        e,
432                    );
433                }
434            }
435
436            // Add the message to be dispatched to the user.
437            if self
438                .subscribed_topics
439                .iter()
440                .any(|t| message.topics.iter().any(|u| t == u))
441            {
442                let event = FloodsubEvent::Message(message.clone());
443                self.events.push_back(ToSwarm::GenerateEvent(event));
444            }
445
446            // Propagate the message to everyone else who is subscribed to any of the topics.
447            for (peer_id, subscr_topics) in self.connected_peers.iter() {
448                if peer_id == &propagation_source {
449                    continue;
450                }
451
452                // Peer must be in a communication list.
453                if !self.target_peers.contains(peer_id) {
454                    continue;
455                }
456
457                // Peer must be subscribed for the topic.
458                if !subscr_topics
459                    .iter()
460                    .any(|t| message.topics.iter().any(|u| t == u))
461                {
462                    continue;
463                }
464
465                if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) {
466                    rpcs_to_dispatch[pos].1.messages.push(message.clone());
467                } else {
468                    rpcs_to_dispatch.push((
469                        *peer_id,
470                        FloodsubRpc {
471                            subscriptions: Vec::new(),
472                            messages: vec![message.clone()],
473                        },
474                    ));
475                }
476            }
477        }
478
479        for (peer_id, rpc) in rpcs_to_dispatch {
480            self.events.push_back(ToSwarm::NotifyHandler {
481                peer_id,
482                handler: NotifyHandler::Any,
483                event: rpc,
484            });
485        }
486    }
487
488    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
489    fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
490        if let Some(event) = self.events.pop_front() {
491            return Poll::Ready(event);
492        }
493
494        Poll::Pending
495    }
496
497    fn on_swarm_event(&mut self, event: FromSwarm) {
498        match event {
499            FromSwarm::ConnectionEstablished(connection_established) => {
500                self.on_connection_established(connection_established)
501            }
502            FromSwarm::ConnectionClosed(connection_closed) => {
503                self.on_connection_closed(connection_closed)
504            }
505            _ => {}
506        }
507    }
508}
509
510/// Transmission between the `OneShotHandler` and the `FloodsubHandler`.
511#[derive(Debug)]
512pub enum InnerMessage {
513    /// We received an RPC from a remote.
514    Rx(FloodsubRpc),
515    /// We successfully sent an RPC request.
516    Sent,
517}
518
519impl From<FloodsubRpc> for InnerMessage {
520    #[inline]
521    fn from(rpc: FloodsubRpc) -> InnerMessage {
522        InnerMessage::Rx(rpc)
523    }
524}
525
526impl From<()> for InnerMessage {
527    #[inline]
528    fn from(_: ()) -> InnerMessage {
529        InnerMessage::Sent
530    }
531}
532
533/// Event that can happen on the floodsub behaviour.
534#[derive(Debug)]
535pub enum FloodsubEvent {
536    /// A message has been received.
537    Message(FloodsubMessage),
538
539    /// A remote subscribed to a topic.
540    Subscribed {
541        /// Remote that has subscribed.
542        peer_id: PeerId,
543        /// The topic it has subscribed to.
544        topic: Topic,
545    },
546
547    /// A remote unsubscribed from a topic.
548    Unsubscribed {
549        /// Remote that has unsubscribed.
550        peer_id: PeerId,
551        /// The topic it has subscribed from.
552        topic: Topic,
553    },
554}