libp2p_gossipsub/
behaviour.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    cmp::{max, Ordering, Ordering::Equal},
23    collections::{BTreeSet, HashMap, HashSet, VecDeque},
24    fmt,
25    fmt::Debug,
26    net::IpAddr,
27    task::{Context, Poll},
28    time::Duration,
29};
30
31use futures::FutureExt;
32use futures_timer::Delay;
33use hashlink::LinkedHashMap;
34use libp2p_core::{
35    multiaddr::Protocol::{Ip4, Ip6},
36    transport::PortUse,
37    Endpoint, Multiaddr,
38};
39use libp2p_identity::{Keypair, PeerId};
40use libp2p_swarm::{
41    behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
42    dial_opts::DialOpts,
43    ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent,
44    THandlerOutEvent, ToSwarm,
45};
46use prometheus_client::registry::Registry;
47use quick_protobuf::{MessageWrite, Writer};
48use rand::{seq::SliceRandom, thread_rng};
49use web_time::{Instant, SystemTime};
50
51use crate::{
52    backoff::BackoffStorage,
53    config::{Config, ValidationMode},
54    gossip_promises::GossipPromises,
55    handler::{Handler, HandlerEvent, HandlerIn},
56    mcache::MessageCache,
57    metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty},
58    peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason},
59    protocol::SIGNING_PREFIX,
60    rpc::Sender,
61    rpc_proto::proto,
62    subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter},
63    time_cache::DuplicateCache,
64    topic::{Hasher, Topic, TopicHash},
65    transform::{DataTransform, IdentityTransform},
66    types::{
67        ControlAction, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, MessageId,
68        PeerConnections, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription,
69        SubscriptionAction,
70    },
71    FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError,
72};
73
74#[cfg(test)]
75mod tests;
76
77/// IDONTWANT cache capacity.
78const IDONTWANT_CAP: usize = 10_000;
79
80/// IDONTWANT timeout before removal.
81const IDONTWANT_TIMEOUT: Duration = Duration::new(3, 0);
82
83/// Determines if published messages should be signed or not.
84///
85/// Without signing, a number of privacy preserving modes can be selected.
86///
87/// NOTE: The default validation settings are to require signatures. The [`ValidationMode`]
88/// should be updated in the [`Config`] to allow for unsigned messages.
89#[derive(Clone)]
90pub enum MessageAuthenticity {
91    /// Message signing is enabled. The author will be the owner of the key and the sequence number
92    /// will be linearly increasing.
93    Signed(Keypair),
94    /// Message signing is disabled.
95    ///
96    /// The specified [`PeerId`] will be used as the author of all published messages. The sequence
97    /// number will be randomized.
98    Author(PeerId),
99    /// Message signing is disabled.
100    ///
101    /// A random [`PeerId`] will be used when publishing each message. The sequence number will be
102    /// randomized.
103    RandomAuthor,
104    /// Message signing is disabled.
105    ///
106    /// The author of the message and the sequence numbers are excluded from the message.
107    ///
108    /// NOTE: Excluding these fields may make these messages invalid by other nodes who
109    /// enforce validation of these fields. See [`ValidationMode`] in the [`Config`]
110    /// for how to customise this for rust-libp2p gossipsub.  A custom `message_id`
111    /// function will need to be set to prevent all messages from a peer being filtered
112    /// as duplicates.
113    Anonymous,
114}
115
116impl MessageAuthenticity {
117    /// Returns true if signing is enabled.
118    pub fn is_signing(&self) -> bool {
119        matches!(self, MessageAuthenticity::Signed(_))
120    }
121
122    pub fn is_anonymous(&self) -> bool {
123        matches!(self, MessageAuthenticity::Anonymous)
124    }
125}
126
127/// Event that can be emitted by the gossipsub behaviour.
128#[derive(Debug)]
129pub enum Event {
130    /// A message has been received.
131    Message {
132        /// The peer that forwarded us this message.
133        propagation_source: PeerId,
134        /// The [`MessageId`] of the message. This should be referenced by the application when
135        /// validating a message (if required).
136        message_id: MessageId,
137        /// The decompressed message itself.
138        message: Message,
139    },
140    /// A remote subscribed to a topic.
141    Subscribed {
142        /// Remote that has subscribed.
143        peer_id: PeerId,
144        /// The topic it has subscribed to.
145        topic: TopicHash,
146    },
147    /// A remote unsubscribed from a topic.
148    Unsubscribed {
149        /// Remote that has unsubscribed.
150        peer_id: PeerId,
151        /// The topic it has subscribed from.
152        topic: TopicHash,
153    },
154    /// A peer that does not support gossipsub has connected.
155    GossipsubNotSupported { peer_id: PeerId },
156    /// A peer is not able to download messages in time.
157    SlowPeer {
158        /// The peer_id
159        peer_id: PeerId,
160        /// The types and amounts of failed messages that are occurring for this peer.
161        failed_messages: FailedMessages,
162    },
163}
164
165/// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`]
166/// for further details.
167#[allow(clippy::large_enum_variant)]
168enum PublishConfig {
169    Signing {
170        keypair: Keypair,
171        author: PeerId,
172        inline_key: Option<Vec<u8>>,
173        last_seq_no: SequenceNumber,
174    },
175    Author(PeerId),
176    RandomAuthor,
177    Anonymous,
178}
179
180/// A strictly linearly increasing sequence number.
181///
182/// We start from the current time as unix timestamp in milliseconds.
183#[derive(Debug)]
184struct SequenceNumber(u64);
185
186impl SequenceNumber {
187    fn new() -> Self {
188        let unix_timestamp = SystemTime::now()
189            .duration_since(SystemTime::UNIX_EPOCH)
190            .expect("time to be linear")
191            .as_nanos();
192
193        Self(unix_timestamp as u64)
194    }
195
196    fn next(&mut self) -> u64 {
197        self.0 = self
198            .0
199            .checked_add(1)
200            .expect("to not exhaust u64 space for sequence numbers");
201
202        self.0
203    }
204}
205
206impl PublishConfig {
207    pub(crate) fn get_own_id(&self) -> Option<&PeerId> {
208        match self {
209            Self::Signing { author, .. } => Some(author),
210            Self::Author(author) => Some(author),
211            _ => None,
212        }
213    }
214}
215
216impl From<MessageAuthenticity> for PublishConfig {
217    fn from(authenticity: MessageAuthenticity) -> Self {
218        match authenticity {
219            MessageAuthenticity::Signed(keypair) => {
220                let public_key = keypair.public();
221                let key_enc = public_key.encode_protobuf();
222                let key = if key_enc.len() <= 42 {
223                    // The public key can be inlined in [`rpc_proto::proto::::Message::from`], so we
224                    // don't include it specifically in the
225                    // [`rpc_proto::proto::Message::key`] field.
226                    None
227                } else {
228                    // Include the protobuf encoding of the public key in the message.
229                    Some(key_enc)
230                };
231
232                PublishConfig::Signing {
233                    keypair,
234                    author: public_key.to_peer_id(),
235                    inline_key: key,
236                    last_seq_no: SequenceNumber::new(),
237                }
238            }
239            MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
240            MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
241            MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
242        }
243    }
244}
245
246/// Network behaviour that handles the gossipsub protocol.
247///
248/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`Config`] instance. If
249/// message signing is disabled, the [`ValidationMode`] in the config should be adjusted to an
250/// appropriate level to accept unsigned messages.
251///
252/// The DataTransform trait allows applications to optionally add extra encoding/decoding
253/// functionality to the underlying messages. This is intended for custom compression algorithms.
254///
255/// The TopicSubscriptionFilter allows applications to implement specific filters on topics to
256/// prevent unwanted messages being propagated and evaluated.
257pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
258    /// Configuration providing gossipsub performance parameters.
259    config: Config,
260
261    /// Events that need to be yielded to the outside when polling.
262    events: VecDeque<ToSwarm<Event, HandlerIn>>,
263
264    /// Information used for publishing messages.
265    publish_config: PublishConfig,
266
267    /// An LRU Time cache for storing seen messages (based on their ID). This cache prevents
268    /// duplicates from being propagated to the application and on the network.
269    duplicate_cache: DuplicateCache<MessageId>,
270
271    /// A set of connected peers, indexed by their [`PeerId`] tracking both the [`PeerKind`] and
272    /// the set of [`ConnectionId`]s.
273    connected_peers: HashMap<PeerId, PeerConnections>,
274
275    /// A set of all explicit peers. These are peers that remain connected and we unconditionally
276    /// forward messages to, outside of the scoring system.
277    explicit_peers: HashSet<PeerId>,
278
279    /// A list of peers that have been blacklisted by the user.
280    /// Messages are not sent to and are rejected from these peers.
281    blacklisted_peers: HashSet<PeerId>,
282
283    /// Overlay network of connected peers - Maps topics to connected gossipsub peers.
284    mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
285
286    /// Map of topics to list of peers that we publish to, but don't subscribe to.
287    fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
288
289    /// The last publish time for fanout topics.
290    fanout_last_pub: HashMap<TopicHash, Instant>,
291
292    /// Storage for backoffs
293    backoffs: BackoffStorage,
294
295    /// Message cache for the last few heartbeats.
296    mcache: MessageCache,
297
298    /// Heartbeat interval stream.
299    heartbeat: Delay,
300
301    /// Number of heartbeats since the beginning of time; this allows us to amortize some resource
302    /// clean up -- eg backoff clean up.
303    heartbeat_ticks: u64,
304
305    /// We remember all peers we found through peer exchange, since those peers are not considered
306    /// as safe as randomly discovered outbound peers. This behaviour diverges from the go
307    /// implementation to avoid possible love bombing attacks in PX. When disconnecting peers will
308    /// be removed from this list which may result in a true outbound rediscovery.
309    px_peers: HashSet<PeerId>,
310
311    /// Set of connected outbound peers (we only consider true outbound peers found through
312    /// discovery and not by PX).
313    outbound_peers: HashSet<PeerId>,
314
315    /// Stores optional peer score data together with thresholds, decay interval and gossip
316    /// promises.
317    peer_score: Option<(PeerScore, PeerScoreThresholds, Delay)>,
318
319    /// Counts the number of `IHAVE` received from each peer since the last heartbeat.
320    count_received_ihave: HashMap<PeerId, usize>,
321
322    /// Counts the number of `IWANT` that we sent the each peer since the last heartbeat.
323    count_sent_iwant: HashMap<PeerId, usize>,
324
325    /// Short term cache for published message ids. This is used for penalizing peers sending
326    /// our own messages back if the messages are anonymous or use a random author.
327    published_message_ids: DuplicateCache<MessageId>,
328
329    /// The filter used to handle message subscriptions.
330    subscription_filter: F,
331
332    /// A general transformation function that can be applied to data received from the wire before
333    /// calculating the message-id and sending to the application. This is designed to allow the
334    /// user to implement arbitrary topic-based compression algorithms.
335    data_transform: D,
336
337    /// Keep track of a set of internal metrics relating to gossipsub.
338    metrics: Option<Metrics>,
339
340    /// Tracks the numbers of failed messages per peer-id.
341    failed_messages: HashMap<PeerId, FailedMessages>,
342
343    /// Tracks recently sent `IWANT` messages and checks if peers respond to them.
344    gossip_promises: GossipPromises,
345}
346
347impl<D, F> Behaviour<D, F>
348where
349    D: DataTransform + Default,
350    F: TopicSubscriptionFilter + Default,
351{
352    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
353    /// [`Config`]. This has no subscription filter and uses no compression.
354    pub fn new(privacy: MessageAuthenticity, config: Config) -> Result<Self, &'static str> {
355        Self::new_with_subscription_filter_and_transform(
356            privacy,
357            config,
358            None,
359            F::default(),
360            D::default(),
361        )
362    }
363
364    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
365    /// [`Config`]. This has no subscription filter and uses no compression.
366    /// Metrics can be evaluated by passing a reference to a [`Registry`].
367    pub fn new_with_metrics(
368        privacy: MessageAuthenticity,
369        config: Config,
370        metrics_registry: &mut Registry,
371        metrics_config: MetricsConfig,
372    ) -> Result<Self, &'static str> {
373        Self::new_with_subscription_filter_and_transform(
374            privacy,
375            config,
376            Some((metrics_registry, metrics_config)),
377            F::default(),
378            D::default(),
379        )
380    }
381}
382
383impl<D, F> Behaviour<D, F>
384where
385    D: DataTransform + Default,
386    F: TopicSubscriptionFilter,
387{
388    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
389    /// [`Config`] and a custom subscription filter.
390    pub fn new_with_subscription_filter(
391        privacy: MessageAuthenticity,
392        config: Config,
393        metrics: Option<(&mut Registry, MetricsConfig)>,
394        subscription_filter: F,
395    ) -> Result<Self, &'static str> {
396        Self::new_with_subscription_filter_and_transform(
397            privacy,
398            config,
399            metrics,
400            subscription_filter,
401            D::default(),
402        )
403    }
404}
405
406impl<D, F> Behaviour<D, F>
407where
408    D: DataTransform,
409    F: TopicSubscriptionFilter + Default,
410{
411    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
412    /// [`Config`] and a custom data transform.
413    pub fn new_with_transform(
414        privacy: MessageAuthenticity,
415        config: Config,
416        metrics: Option<(&mut Registry, MetricsConfig)>,
417        data_transform: D,
418    ) -> Result<Self, &'static str> {
419        Self::new_with_subscription_filter_and_transform(
420            privacy,
421            config,
422            metrics,
423            F::default(),
424            data_transform,
425        )
426    }
427}
428
429impl<D, F> Behaviour<D, F>
430where
431    D: DataTransform,
432    F: TopicSubscriptionFilter,
433{
434    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
435    /// [`Config`] and a custom subscription filter and data transform.
436    pub fn new_with_subscription_filter_and_transform(
437        privacy: MessageAuthenticity,
438        config: Config,
439        metrics: Option<(&mut Registry, MetricsConfig)>,
440        subscription_filter: F,
441        data_transform: D,
442    ) -> Result<Self, &'static str> {
443        // Set up the router given the configuration settings.
444
445        // We do not allow configurations where a published message would also be rejected if it
446        // were received locally.
447        validate_config(&privacy, config.validation_mode())?;
448
449        Ok(Behaviour {
450            metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)),
451            events: VecDeque::new(),
452            publish_config: privacy.into(),
453            duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
454            explicit_peers: HashSet::new(),
455            blacklisted_peers: HashSet::new(),
456            mesh: HashMap::new(),
457            fanout: HashMap::new(),
458            fanout_last_pub: HashMap::new(),
459            backoffs: BackoffStorage::new(
460                &config.prune_backoff(),
461                config.heartbeat_interval(),
462                config.backoff_slack(),
463            ),
464            mcache: MessageCache::new(config.history_gossip(), config.history_length()),
465            heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
466            heartbeat_ticks: 0,
467            px_peers: HashSet::new(),
468            outbound_peers: HashSet::new(),
469            peer_score: None,
470            count_received_ihave: HashMap::new(),
471            count_sent_iwant: HashMap::new(),
472            connected_peers: HashMap::new(),
473            published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
474            config,
475            subscription_filter,
476            data_transform,
477            failed_messages: Default::default(),
478            gossip_promises: Default::default(),
479        })
480    }
481}
482
483impl<D, F> Behaviour<D, F>
484where
485    D: DataTransform + Send + 'static,
486    F: TopicSubscriptionFilter + Send + 'static,
487{
488    /// Lists the hashes of the topics we are currently subscribed to.
489    pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
490        self.mesh.keys()
491    }
492
493    /// Lists all mesh peers for a certain topic hash.
494    pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
495        self.mesh.get(topic_hash).into_iter().flat_map(|x| x.iter())
496    }
497
498    pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
499        let mut res = BTreeSet::new();
500        for peers in self.mesh.values() {
501            res.extend(peers);
502        }
503        res.into_iter()
504    }
505
506    /// Lists all known peers and their associated subscribed topics.
507    pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
508        self.connected_peers
509            .iter()
510            .map(|(peer_id, peer)| (peer_id, peer.topics.iter().collect()))
511    }
512
513    /// Lists all known peers and their associated protocol.
514    pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
515        self.connected_peers.iter().map(|(k, v)| (k, &v.kind))
516    }
517
518    /// Returns the gossipsub score for a given peer, if one exists.
519    pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
520        self.peer_score
521            .as_ref()
522            .map(|(score, ..)| score.score(peer_id))
523    }
524
525    /// Subscribe to a topic.
526    ///
527    /// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already
528    /// subscribed.
529    pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
530        tracing::debug!(%topic, "Subscribing to topic");
531        let topic_hash = topic.hash();
532        if !self.subscription_filter.can_subscribe(&topic_hash) {
533            return Err(SubscriptionError::NotAllowed);
534        }
535
536        if self.mesh.contains_key(&topic_hash) {
537            tracing::debug!(%topic, "Topic is already in the mesh");
538            return Ok(false);
539        }
540
541        // send subscription request to all peers
542        for peer_id in self.connected_peers.keys().copied().collect::<Vec<_>>() {
543            tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer");
544            let event = RpcOut::Subscribe(topic_hash.clone());
545            self.send_message(peer_id, event);
546        }
547
548        // call JOIN(topic)
549        // this will add new peers to the mesh for the topic
550        self.join(&topic_hash);
551        tracing::debug!(%topic, "Subscribed to topic");
552        Ok(true)
553    }
554
555    /// Unsubscribes from a topic.
556    ///
557    /// Returns `true` if we were subscribed to this topic.
558    pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> bool {
559        tracing::debug!(%topic, "Unsubscribing from topic");
560        let topic_hash = topic.hash();
561
562        if !self.mesh.contains_key(&topic_hash) {
563            tracing::debug!(topic=%topic_hash, "Already unsubscribed from topic");
564            // we are not subscribed
565            return false;
566        }
567
568        // announce to all peers
569        for peer in self.connected_peers.keys().copied().collect::<Vec<_>>() {
570            tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
571            let event = RpcOut::Unsubscribe(topic_hash.clone());
572            self.send_message(peer, event);
573        }
574
575        // call LEAVE(topic)
576        // this will remove the topic from the mesh
577        self.leave(&topic_hash);
578
579        tracing::debug!(topic=%topic_hash, "Unsubscribed from topic");
580        true
581    }
582
583    /// Publishes a message with multiple topics to the network.
584    pub fn publish(
585        &mut self,
586        topic: impl Into<TopicHash>,
587        data: impl Into<Vec<u8>>,
588    ) -> Result<MessageId, PublishError> {
589        let data = data.into();
590        let topic = topic.into();
591
592        // Transform the data before building a raw_message.
593        let transformed_data = self
594            .data_transform
595            .outbound_transform(&topic, data.clone())?;
596
597        // check that the size doesn't exceed the max transmission size.
598        if transformed_data.len() > self.config.max_transmit_size() {
599            return Err(PublishError::MessageTooLarge);
600        }
601
602        let raw_message = self.build_raw_message(topic, transformed_data)?;
603
604        // calculate the message id from the un-transformed data
605        let msg_id = self.config.message_id(&Message {
606            source: raw_message.source,
607            data, // the uncompressed form
608            sequence_number: raw_message.sequence_number,
609            topic: raw_message.topic.clone(),
610        });
611
612        // Check the if the message has been published before
613        if self.duplicate_cache.contains(&msg_id) {
614            // This message has already been seen. We don't re-publish messages that have already
615            // been published on the network.
616            tracing::warn!(
617                message=%msg_id,
618                "Not publishing a message that has already been published"
619            );
620            return Err(PublishError::Duplicate);
621        }
622
623        tracing::trace!(message=%msg_id, "Publishing message");
624
625        let topic_hash = raw_message.topic.clone();
626
627        let mut peers_on_topic = self
628            .connected_peers
629            .iter()
630            .filter(|(_, p)| p.topics.contains(&topic_hash))
631            .map(|(peer_id, _)| peer_id)
632            .peekable();
633
634        if peers_on_topic.peek().is_none() {
635            return Err(PublishError::InsufficientPeers);
636        }
637
638        let mut recipient_peers = HashSet::new();
639        if self.config.flood_publish() {
640            // Forward to all peers above score and all explicit peers
641            recipient_peers.extend(peers_on_topic.filter(|p| {
642                self.explicit_peers.contains(*p)
643                    || !self.score_below_threshold(p, |ts| ts.publish_threshold).0
644            }));
645        } else {
646            match self.mesh.get(&topic_hash) {
647                // Mesh peers
648                Some(mesh_peers) => {
649                    // We have a mesh set. We want to make sure to publish to at least `mesh_n`
650                    // peers (if possible).
651                    let needed_extra_peers = self.config.mesh_n().saturating_sub(mesh_peers.len());
652
653                    if needed_extra_peers > 0 {
654                        // We don't have `mesh_n` peers in our mesh, we will randomly select extras
655                        // and publish to them.
656
657                        // Get a random set of peers that are appropriate to send messages too.
658                        let peer_list = get_random_peers(
659                            &self.connected_peers,
660                            &topic_hash,
661                            needed_extra_peers,
662                            |peer| {
663                                !mesh_peers.contains(peer)
664                                    && !self.explicit_peers.contains(peer)
665                                    && !self
666                                        .score_below_threshold(peer, |pst| pst.publish_threshold)
667                                        .0
668                            },
669                        );
670                        recipient_peers.extend(peer_list);
671                    }
672
673                    recipient_peers.extend(mesh_peers);
674                }
675                // Gossipsub peers
676                None => {
677                    tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
678                    // `fanout_peers` is always non-empty if it's `Some`.
679                    let fanout_peers = self
680                        .fanout
681                        .get(&topic_hash)
682                        .filter(|peers| !peers.is_empty());
683                    // If we have fanout peers add them to the map.
684                    if let Some(peers) = fanout_peers {
685                        for peer in peers {
686                            recipient_peers.insert(*peer);
687                        }
688                    } else {
689                        // We have no fanout peers, select mesh_n of them and add them to the fanout
690                        let mesh_n = self.config.mesh_n();
691                        let new_peers =
692                            get_random_peers(&self.connected_peers, &topic_hash, mesh_n, {
693                                |p| {
694                                    !self.explicit_peers.contains(p)
695                                        && !self
696                                            .score_below_threshold(p, |pst| pst.publish_threshold)
697                                            .0
698                                }
699                            });
700                        // Add the new peers to the fanout and recipient peers
701                        self.fanout.insert(topic_hash.clone(), new_peers.clone());
702                        for peer in new_peers {
703                            tracing::debug!(%peer, "Peer added to fanout");
704                            recipient_peers.insert(peer);
705                        }
706                    }
707                    // We are publishing to fanout peers - update the time we published
708                    self.fanout_last_pub
709                        .insert(topic_hash.clone(), Instant::now());
710                }
711            }
712
713            // Explicit peers that are part of the topic
714            recipient_peers
715                .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id)));
716
717            // Floodsub peers
718            for (peer, connections) in &self.connected_peers {
719                if connections.kind == PeerKind::Floodsub
720                    && !self
721                        .score_below_threshold(peer, |ts| ts.publish_threshold)
722                        .0
723                {
724                    recipient_peers.insert(*peer);
725                }
726            }
727        }
728
729        // If the message isn't a duplicate and we have sent it to some peers add it to the
730        // duplicate cache and memcache.
731        self.duplicate_cache.insert(msg_id.clone());
732        self.mcache.put(&msg_id, raw_message.clone());
733
734        // If the message is anonymous or has a random author add it to the published message ids
735        // cache.
736        if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
737            if !self.config.allow_self_origin() {
738                self.published_message_ids.insert(msg_id.clone());
739            }
740        }
741
742        // Send to peers we know are subscribed to the topic.
743        let mut publish_failed = true;
744        for peer_id in recipient_peers.iter() {
745            tracing::trace!(peer=%peer_id, "Sending message to peer");
746            if self.send_message(
747                *peer_id,
748                RpcOut::Publish {
749                    message: raw_message.clone(),
750                    timeout: Delay::new(self.config.publish_queue_duration()),
751                },
752            ) {
753                publish_failed = false
754            }
755        }
756
757        if recipient_peers.is_empty() {
758            return Err(PublishError::InsufficientPeers);
759        }
760
761        if publish_failed {
762            return Err(PublishError::AllQueuesFull(recipient_peers.len()));
763        }
764
765        // Broadcast IDONTWANT messages
766        if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold()
767            && self.config.idontwant_on_publish()
768        {
769            self.send_idontwant(&raw_message, &msg_id, raw_message.source.as_ref());
770        }
771
772        tracing::debug!(message=%msg_id, "Published message");
773
774        if let Some(metrics) = self.metrics.as_mut() {
775            metrics.register_published_message(&topic_hash);
776        }
777
778        Ok(msg_id)
779    }
780
781    /// This function should be called when [`Config::validate_messages()`] is `true` after
782    /// the message got validated by the caller. Messages are stored in the ['Memcache'] and
783    /// validation is expected to be fast enough that the messages should still exist in the cache.
784    /// There are three possible validation outcomes and the outcome is given in acceptance.
785    ///
786    /// If acceptance = [`MessageAcceptance::Accept`] the message will get propagated to the
787    /// network. The `propagation_source` parameter indicates who the message was received by and
788    /// will not be forwarded back to that peer.
789    ///
790    /// If acceptance = [`MessageAcceptance::Reject`] the message will be deleted from the memcache
791    /// and the Pâ‚„ penalty will be applied to the `propagation_source`.
792    //
793    /// If acceptance = [`MessageAcceptance::Ignore`] the message will be deleted from the memcache
794    /// but no Pâ‚„ penalty will be applied.
795    ///
796    /// This function will return true if the message was found in the cache and false if was not
797    /// in the cache anymore.
798    ///
799    /// This should only be called once per message.
800    pub fn report_message_validation_result(
801        &mut self,
802        msg_id: &MessageId,
803        propagation_source: &PeerId,
804        acceptance: MessageAcceptance,
805    ) -> bool {
806        let reject_reason = match acceptance {
807            MessageAcceptance::Accept => {
808                let (raw_message, originating_peers) = match self.mcache.validate(msg_id) {
809                    Some((raw_message, originating_peers)) => {
810                        (raw_message.clone(), originating_peers)
811                    }
812                    None => {
813                        tracing::warn!(
814                            message=%msg_id,
815                            "Message not in cache. Ignoring forwarding"
816                        );
817                        if let Some(metrics) = self.metrics.as_mut() {
818                            metrics.memcache_miss();
819                        }
820                        return false;
821                    }
822                };
823
824                if let Some(metrics) = self.metrics.as_mut() {
825                    metrics.register_msg_validation(&raw_message.topic, &acceptance);
826                }
827
828                self.forward_msg(
829                    msg_id,
830                    raw_message,
831                    Some(propagation_source),
832                    originating_peers,
833                );
834                return true;
835            }
836            MessageAcceptance::Reject => RejectReason::ValidationFailed,
837            MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
838        };
839
840        if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) {
841            if let Some(metrics) = self.metrics.as_mut() {
842                metrics.register_msg_validation(&raw_message.topic, &acceptance);
843            }
844
845            // Tell peer_score about reject
846            // Reject the original source, and any duplicates we've seen from other peers.
847            if let Some((peer_score, ..)) = &mut self.peer_score {
848                peer_score.reject_message(
849                    propagation_source,
850                    msg_id,
851                    &raw_message.topic,
852                    reject_reason,
853                );
854                for peer in originating_peers.iter() {
855                    peer_score.reject_message(peer, msg_id, &raw_message.topic, reject_reason);
856                }
857            }
858            true
859        } else {
860            tracing::warn!(message=%msg_id, "Rejected message not in cache");
861            false
862        }
863    }
864
865    /// Adds a new peer to the list of explicitly connected peers.
866    pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
867        tracing::debug!(peer=%peer_id, "Adding explicit peer");
868
869        self.explicit_peers.insert(*peer_id);
870
871        self.check_explicit_peer_connection(peer_id);
872    }
873
874    /// This removes the peer from explicitly connected peers, note that this does not disconnect
875    /// the peer.
876    pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
877        tracing::debug!(peer=%peer_id, "Removing explicit peer");
878        self.explicit_peers.remove(peer_id);
879    }
880
881    /// Blacklists a peer. All messages from this peer will be rejected and any message that was
882    /// created by this peer will be rejected.
883    pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
884        if self.blacklisted_peers.insert(*peer_id) {
885            tracing::debug!(peer=%peer_id, "Peer has been blacklisted");
886        }
887    }
888
889    /// Removes a peer from the blacklist if it has previously been blacklisted.
890    pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
891        if self.blacklisted_peers.remove(peer_id) {
892            tracing::debug!(peer=%peer_id, "Peer has been removed from the blacklist");
893        }
894    }
895
896    /// Activates the peer scoring system with the given parameters. This will reset all scores
897    /// if there was already another peer scoring system activated. Returns an error if the
898    /// params are not valid or if they got already set.
899    pub fn with_peer_score(
900        &mut self,
901        params: PeerScoreParams,
902        threshold: PeerScoreThresholds,
903    ) -> Result<(), String> {
904        self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
905    }
906
907    /// Activates the peer scoring system with the given parameters and a message delivery time
908    /// callback. Returns an error if the parameters got already set.
909    pub fn with_peer_score_and_message_delivery_time_callback(
910        &mut self,
911        params: PeerScoreParams,
912        threshold: PeerScoreThresholds,
913        callback: Option<fn(&PeerId, &TopicHash, f64)>,
914    ) -> Result<(), String> {
915        params.validate()?;
916        threshold.validate()?;
917
918        if self.peer_score.is_some() {
919            return Err("Peer score set twice".into());
920        }
921
922        let interval = Delay::new(params.decay_interval);
923        let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
924        self.peer_score = Some((peer_score, threshold, interval));
925        Ok(())
926    }
927
928    /// Sets scoring parameters for a topic.
929    ///
930    /// The [`Self::with_peer_score()`] must first be called to initialise peer scoring.
931    pub fn set_topic_params<H: Hasher>(
932        &mut self,
933        topic: Topic<H>,
934        params: TopicScoreParams,
935    ) -> Result<(), &'static str> {
936        if let Some((peer_score, ..)) = &mut self.peer_score {
937            peer_score.set_topic_params(topic.hash(), params);
938            Ok(())
939        } else {
940            Err("Peer score must be initialised with `with_peer_score()`")
941        }
942    }
943
944    /// Returns a scoring parameters for a topic if existent.
945    pub fn get_topic_params<H: Hasher>(&self, topic: &Topic<H>) -> Option<&TopicScoreParams> {
946        self.peer_score.as_ref()?.0.get_topic_params(&topic.hash())
947    }
948
949    /// Sets the application specific score for a peer. Returns true if scoring is active and
950    /// the peer is connected or if the score of the peer is not yet expired, false otherwise.
951    pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
952        if let Some((peer_score, ..)) = &mut self.peer_score {
953            peer_score.set_application_score(peer_id, new_score)
954        } else {
955            false
956        }
957    }
958
959    /// Gossipsub JOIN(topic) - adds topic peers to mesh and sends them GRAFT messages.
960    fn join(&mut self, topic_hash: &TopicHash) {
961        tracing::debug!(topic=%topic_hash, "Running JOIN for topic");
962
963        // if we are already in the mesh, return
964        if self.mesh.contains_key(topic_hash) {
965            tracing::debug!(topic=%topic_hash, "JOIN: The topic is already in the mesh, ignoring JOIN");
966            return;
967        }
968
969        let mut added_peers = HashSet::new();
970
971        if let Some(m) = self.metrics.as_mut() {
972            m.joined(topic_hash)
973        }
974
975        // check if we have mesh_n peers in fanout[topic] and add them to the mesh if we do,
976        // removing the fanout entry.
977        if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
978            tracing::debug!(
979                topic=%topic_hash,
980                "JOIN: Removing peers from the fanout for topic"
981            );
982
983            // remove explicit peers, peers with negative scores, and backoffed peers
984            peers.retain(|p| {
985                !self.explicit_peers.contains(p)
986                    && !self.score_below_threshold(p, |_| 0.0).0
987                    && !self.backoffs.is_backoff_with_slack(topic_hash, p)
988            });
989
990            // Add up to mesh_n of them to the mesh
991            // NOTE: These aren't randomly added, currently FIFO
992            let add_peers = std::cmp::min(peers.len(), self.config.mesh_n());
993            tracing::debug!(
994                topic=%topic_hash,
995                "JOIN: Adding {:?} peers from the fanout for topic",
996                add_peers
997            );
998            added_peers.extend(peers.iter().take(add_peers));
999
1000            self.mesh.insert(
1001                topic_hash.clone(),
1002                peers.into_iter().take(add_peers).collect(),
1003            );
1004
1005            // remove the last published time
1006            self.fanout_last_pub.remove(topic_hash);
1007        }
1008
1009        let fanaout_added = added_peers.len();
1010        if let Some(m) = self.metrics.as_mut() {
1011            m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added)
1012        }
1013
1014        // check if we need to get more peers, which we randomly select
1015        if added_peers.len() < self.config.mesh_n() {
1016            // get the peers
1017            let new_peers = get_random_peers(
1018                &self.connected_peers,
1019                topic_hash,
1020                self.config.mesh_n() - added_peers.len(),
1021                |peer| {
1022                    !added_peers.contains(peer)
1023                        && !self.explicit_peers.contains(peer)
1024                        && !self.score_below_threshold(peer, |_| 0.0).0
1025                        && !self.backoffs.is_backoff_with_slack(topic_hash, peer)
1026                },
1027            );
1028            added_peers.extend(new_peers.clone());
1029            // add them to the mesh
1030            tracing::debug!(
1031                "JOIN: Inserting {:?} random peers into the mesh",
1032                new_peers.len()
1033            );
1034            let mesh_peers = self.mesh.entry(topic_hash.clone()).or_default();
1035            mesh_peers.extend(new_peers);
1036        }
1037
1038        let random_added = added_peers.len() - fanaout_added;
1039        if let Some(m) = self.metrics.as_mut() {
1040            m.peers_included(topic_hash, Inclusion::Random, random_added)
1041        }
1042
1043        for peer_id in added_peers {
1044            // Send a GRAFT control message
1045            tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer");
1046            if let Some((peer_score, ..)) = &mut self.peer_score {
1047                peer_score.graft(&peer_id, topic_hash.clone());
1048            }
1049            self.send_message(
1050                peer_id,
1051                RpcOut::Graft(Graft {
1052                    topic_hash: topic_hash.clone(),
1053                }),
1054            );
1055
1056            // If the peer did not previously exist in any mesh, inform the handler
1057            peer_added_to_mesh(
1058                peer_id,
1059                vec![topic_hash],
1060                &self.mesh,
1061                &mut self.events,
1062                &self.connected_peers,
1063            );
1064        }
1065
1066        let mesh_peers = self.mesh_peers(topic_hash).count();
1067        if let Some(m) = self.metrics.as_mut() {
1068            m.set_mesh_peers(topic_hash, mesh_peers)
1069        }
1070
1071        tracing::debug!(topic=%topic_hash, "Completed JOIN for topic");
1072    }
1073
1074    /// Creates a PRUNE gossipsub action.
1075    fn make_prune(
1076        &mut self,
1077        topic_hash: &TopicHash,
1078        peer: &PeerId,
1079        do_px: bool,
1080        on_unsubscribe: bool,
1081    ) -> Prune {
1082        if let Some((peer_score, ..)) = &mut self.peer_score {
1083            peer_score.prune(peer, topic_hash.clone());
1084        }
1085
1086        match self.connected_peers.get(peer).map(|v| &v.kind) {
1087            Some(PeerKind::Floodsub) => {
1088                tracing::error!("Attempted to prune a Floodsub peer");
1089            }
1090            Some(PeerKind::Gossipsub) => {
1091                // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
1092                return Prune {
1093                    topic_hash: topic_hash.clone(),
1094                    peers: Vec::new(),
1095                    backoff: None,
1096                };
1097            }
1098            None => {
1099                tracing::error!("Attempted to Prune an unknown peer");
1100            }
1101            _ => {} // Gossipsub 1.1 peer perform the `Prune`
1102        }
1103
1104        // Select peers for peer exchange
1105        let peers = if do_px {
1106            get_random_peers(
1107                &self.connected_peers,
1108                topic_hash,
1109                self.config.prune_peers(),
1110                |p| p != peer && !self.score_below_threshold(p, |_| 0.0).0,
1111            )
1112            .into_iter()
1113            .map(|p| PeerInfo { peer_id: Some(p) })
1114            .collect()
1115        } else {
1116            Vec::new()
1117        };
1118
1119        let backoff = if on_unsubscribe {
1120            self.config.unsubscribe_backoff()
1121        } else {
1122            self.config.prune_backoff()
1123        };
1124
1125        // update backoff
1126        self.backoffs.update_backoff(topic_hash, peer, backoff);
1127
1128        Prune {
1129            topic_hash: topic_hash.clone(),
1130            peers,
1131            backoff: Some(backoff.as_secs()),
1132        }
1133    }
1134
1135    /// Gossipsub LEAVE(topic) - Notifies mesh\[topic\] peers with PRUNE messages.
1136    fn leave(&mut self, topic_hash: &TopicHash) {
1137        tracing::debug!(topic=%topic_hash, "Running LEAVE for topic");
1138
1139        // If our mesh contains the topic, send prune to peers and delete it from the mesh
1140        if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
1141            if let Some(m) = self.metrics.as_mut() {
1142                m.left(topic_hash)
1143            }
1144            for peer_id in peers {
1145                // Send a PRUNE control message
1146                tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer");
1147
1148                let on_unsubscribe = true;
1149                let prune =
1150                    self.make_prune(topic_hash, &peer_id, self.config.do_px(), on_unsubscribe);
1151                self.send_message(peer_id, RpcOut::Prune(prune));
1152
1153                // If the peer did not previously exist in any mesh, inform the handler
1154                peer_removed_from_mesh(
1155                    peer_id,
1156                    topic_hash,
1157                    &self.mesh,
1158                    &mut self.events,
1159                    &self.connected_peers,
1160                );
1161            }
1162        }
1163        tracing::debug!(topic=%topic_hash, "Completed LEAVE for topic");
1164    }
1165
1166    /// Checks if the given peer is still connected and if not dials the peer again.
1167    fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
1168        if !self.connected_peers.contains_key(peer_id) {
1169            // Connect to peer
1170            tracing::debug!(peer=%peer_id, "Connecting to explicit peer");
1171            self.events.push_back(ToSwarm::Dial {
1172                opts: DialOpts::peer_id(*peer_id).build(),
1173            });
1174        }
1175    }
1176
1177    /// Determines if a peer's score is below a given `PeerScoreThreshold` chosen via the
1178    /// `threshold` parameter.
1179    fn score_below_threshold(
1180        &self,
1181        peer_id: &PeerId,
1182        threshold: impl Fn(&PeerScoreThresholds) -> f64,
1183    ) -> (bool, f64) {
1184        Self::score_below_threshold_from_scores(&self.peer_score, peer_id, threshold)
1185    }
1186
1187    fn score_below_threshold_from_scores(
1188        peer_score: &Option<(PeerScore, PeerScoreThresholds, Delay)>,
1189        peer_id: &PeerId,
1190        threshold: impl Fn(&PeerScoreThresholds) -> f64,
1191    ) -> (bool, f64) {
1192        if let Some((peer_score, thresholds, ..)) = peer_score {
1193            let score = peer_score.score(peer_id);
1194            if score < threshold(thresholds) {
1195                return (true, score);
1196            }
1197            (false, score)
1198        } else {
1199            (false, 0.0)
1200        }
1201    }
1202
1203    /// Handles an IHAVE control message. Checks our cache of messages. If the message is unknown,
1204    /// requests it with an IWANT control message.
1205    fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
1206        // We ignore IHAVE gossip from any peer whose score is below the gossip threshold
1207        if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1208            tracing::debug!(
1209                peer=%peer_id,
1210                %score,
1211                "IHAVE: ignoring peer with score below threshold"
1212            );
1213            return;
1214        }
1215
1216        // IHAVE flood protection
1217        let peer_have = self.count_received_ihave.entry(*peer_id).or_insert(0);
1218        *peer_have += 1;
1219        if *peer_have > self.config.max_ihave_messages() {
1220            tracing::debug!(
1221                peer=%peer_id,
1222                "IHAVE: peer has advertised too many times ({}) within this heartbeat \
1223            interval; ignoring",
1224                *peer_have
1225            );
1226            return;
1227        }
1228
1229        if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
1230            if *iasked >= self.config.max_ihave_length() {
1231                tracing::debug!(
1232                    peer=%peer_id,
1233                    "IHAVE: peer has already advertised too many messages ({}); ignoring",
1234                    *iasked
1235                );
1236                return;
1237            }
1238        }
1239
1240        tracing::trace!(peer=%peer_id, "Handling IHAVE for peer");
1241
1242        let mut iwant_ids = HashSet::new();
1243
1244        let want_message = |id: &MessageId| {
1245            if self.duplicate_cache.contains(id) {
1246                return false;
1247            }
1248
1249            !self.gossip_promises.contains(id)
1250        };
1251
1252        for (topic, ids) in ihave_msgs {
1253            // only process the message if we are subscribed
1254            if !self.mesh.contains_key(&topic) {
1255                tracing::debug!(
1256                    %topic,
1257                    "IHAVE: Ignoring IHAVE - Not subscribed to topic"
1258                );
1259                continue;
1260            }
1261
1262            for id in ids.into_iter().filter(want_message) {
1263                // have not seen this message and are not currently requesting it
1264                if iwant_ids.insert(id) {
1265                    // Register the IWANT metric
1266                    if let Some(metrics) = self.metrics.as_mut() {
1267                        metrics.register_iwant(&topic);
1268                    }
1269                }
1270            }
1271        }
1272
1273        if !iwant_ids.is_empty() {
1274            let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
1275            let mut iask = iwant_ids.len();
1276            if *iasked + iask > self.config.max_ihave_length() {
1277                iask = self.config.max_ihave_length().saturating_sub(*iasked);
1278            }
1279
1280            // Send the list of IWANT control messages
1281            tracing::debug!(
1282                peer=%peer_id,
1283                "IHAVE: Asking for {} out of {} messages from peer",
1284                iask,
1285                iwant_ids.len()
1286            );
1287
1288            // Ask in random order
1289            let mut iwant_ids_vec: Vec<_> = iwant_ids.into_iter().collect();
1290            let mut rng = thread_rng();
1291            iwant_ids_vec.partial_shuffle(&mut rng, iask);
1292
1293            iwant_ids_vec.truncate(iask);
1294            *iasked += iask;
1295
1296            self.gossip_promises.add_promise(
1297                *peer_id,
1298                &iwant_ids_vec,
1299                Instant::now() + self.config.iwant_followup_time(),
1300            );
1301            tracing::trace!(
1302                peer=%peer_id,
1303                "IHAVE: Asking for the following messages from peer: {:?}",
1304                iwant_ids_vec
1305            );
1306
1307            self.send_message(
1308                *peer_id,
1309                RpcOut::IWant(IWant {
1310                    message_ids: iwant_ids_vec,
1311                }),
1312            );
1313        }
1314        tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
1315    }
1316
1317    /// Handles an IWANT control message. Checks our cache of messages. If the message exists it is
1318    /// forwarded to the requesting peer.
1319    fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
1320        // We ignore IWANT gossip from any peer whose score is below the gossip threshold
1321        if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1322            tracing::debug!(
1323                peer=%peer_id,
1324                "IWANT: ignoring peer with score below threshold [score = {}]",
1325                score
1326            );
1327            return;
1328        }
1329
1330        tracing::debug!(peer=%peer_id, "Handling IWANT for peer");
1331
1332        for id in iwant_msgs {
1333            // If we have it and the IHAVE count is not above the threshold,
1334            // forward the message.
1335            if let Some((msg, count)) = self
1336                .mcache
1337                .get_with_iwant_counts(&id, peer_id)
1338                .map(|(msg, count)| (msg.clone(), count))
1339            {
1340                if count > self.config.gossip_retransimission() {
1341                    tracing::debug!(
1342                        peer=%peer_id,
1343                        message=%id,
1344                        "IWANT: Peer has asked for message too many times; ignoring request"
1345                    );
1346                } else {
1347                    tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
1348                    self.send_message(
1349                        *peer_id,
1350                        RpcOut::Forward {
1351                            message: msg,
1352                            timeout: Delay::new(self.config.forward_queue_duration()),
1353                        },
1354                    );
1355                }
1356            }
1357        }
1358        tracing::debug!(peer=%peer_id, "Completed IWANT handling for peer");
1359    }
1360
1361    /// Handles GRAFT control messages. If subscribed to the topic, adds the peer to mesh, if not,
1362    /// responds with PRUNE messages.
1363    fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
1364        tracing::debug!(peer=%peer_id, "Handling GRAFT message for peer");
1365
1366        let mut to_prune_topics = HashSet::new();
1367
1368        let mut do_px = self.config.do_px();
1369
1370        let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else {
1371            tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft");
1372            return;
1373        };
1374
1375        // For each topic, if a peer has grafted us, then we necessarily must be in their mesh
1376        // and they must be subscribed to the topic. Ensure we have recorded the mapping.
1377        for topic in &topics {
1378            if connected_peer.topics.insert(topic.clone()) {
1379                if let Some(m) = self.metrics.as_mut() {
1380                    m.inc_topic_peers(topic);
1381                }
1382            }
1383        }
1384
1385        // we don't GRAFT to/from explicit peers; complain loudly if this happens
1386        if self.explicit_peers.contains(peer_id) {
1387            tracing::warn!(peer=%peer_id, "GRAFT: ignoring request from direct peer");
1388            // this is possibly a bug from non-reciprocal configuration; send a PRUNE for all topics
1389            to_prune_topics = topics.into_iter().collect();
1390            // but don't PX
1391            do_px = false
1392        } else {
1393            let (below_zero, score) = self.score_below_threshold(peer_id, |_| 0.0);
1394            let now = Instant::now();
1395            for topic_hash in topics {
1396                if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1397                    // if the peer is already in the mesh ignore the graft
1398                    if peers.contains(peer_id) {
1399                        tracing::debug!(
1400                            peer=%peer_id,
1401                            topic=%&topic_hash,
1402                            "GRAFT: Received graft for peer that is already in topic"
1403                        );
1404                        continue;
1405                    }
1406
1407                    // make sure we are not backing off that peer
1408                    if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
1409                    {
1410                        if backoff_time > now {
1411                            tracing::warn!(
1412                                peer=%peer_id,
1413                                "[Penalty] Peer attempted graft within backoff time, penalizing"
1414                            );
1415                            // add behavioural penalty
1416                            if let Some((peer_score, ..)) = &mut self.peer_score {
1417                                if let Some(metrics) = self.metrics.as_mut() {
1418                                    metrics.register_score_penalty(Penalty::GraftBackoff);
1419                                }
1420                                peer_score.add_penalty(peer_id, 1);
1421
1422                                // check the flood cutoff
1423                                // See: https://github.com/rust-lang/rust-clippy/issues/10061
1424                                #[allow(unknown_lints, clippy::unchecked_duration_subtraction)]
1425                                let flood_cutoff = (backoff_time
1426                                    + self.config.graft_flood_threshold())
1427                                    - self.config.prune_backoff();
1428                                if flood_cutoff > now {
1429                                    // extra penalty
1430                                    peer_score.add_penalty(peer_id, 1);
1431                                }
1432                            }
1433                            // no PX
1434                            do_px = false;
1435
1436                            to_prune_topics.insert(topic_hash.clone());
1437                            continue;
1438                        }
1439                    }
1440
1441                    // check the score
1442                    if below_zero {
1443                        // we don't GRAFT peers with negative score
1444                        tracing::debug!(
1445                            peer=%peer_id,
1446                            %score,
1447                            topic=%topic_hash,
1448                            "GRAFT: ignoring peer with negative score"
1449                        );
1450                        // we do send them PRUNE however, because it's a matter of protocol
1451                        // correctness
1452                        to_prune_topics.insert(topic_hash.clone());
1453                        // but we won't PX to them
1454                        do_px = false;
1455                        continue;
1456                    }
1457
1458                    // check mesh upper bound and only allow graft if the upper bound is not reached
1459                    // or if it is an outbound peer
1460                    if peers.len() >= self.config.mesh_n_high()
1461                        && !self.outbound_peers.contains(peer_id)
1462                    {
1463                        to_prune_topics.insert(topic_hash.clone());
1464                        continue;
1465                    }
1466
1467                    // add peer to the mesh
1468                    tracing::debug!(
1469                        peer=%peer_id,
1470                        topic=%topic_hash,
1471                        "GRAFT: Mesh link added for peer in topic"
1472                    );
1473
1474                    if peers.insert(*peer_id) {
1475                        if let Some(m) = self.metrics.as_mut() {
1476                            m.peers_included(&topic_hash, Inclusion::Subscribed, 1)
1477                        }
1478                    }
1479
1480                    // If the peer did not previously exist in any mesh, inform the handler
1481                    peer_added_to_mesh(
1482                        *peer_id,
1483                        vec![&topic_hash],
1484                        &self.mesh,
1485                        &mut self.events,
1486                        &self.connected_peers,
1487                    );
1488
1489                    if let Some((peer_score, ..)) = &mut self.peer_score {
1490                        peer_score.graft(peer_id, topic_hash);
1491                    }
1492                } else {
1493                    // don't do PX when there is an unknown topic to avoid leaking our peers
1494                    do_px = false;
1495                    tracing::debug!(
1496                        peer=%peer_id,
1497                        topic=%topic_hash,
1498                        "GRAFT: Received graft for unknown topic from peer"
1499                    );
1500                    // spam hardening: ignore GRAFTs for unknown topics
1501                    continue;
1502                }
1503            }
1504        }
1505
1506        if !to_prune_topics.is_empty() {
1507            // build the prune messages to send
1508            let on_unsubscribe = false;
1509
1510            for prune in to_prune_topics
1511                .iter()
1512                .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
1513                .collect::<Vec<_>>()
1514            {
1515                self.send_message(*peer_id, RpcOut::Prune(prune));
1516            }
1517            // Send the prune messages to the peer
1518            tracing::debug!(
1519                peer=%peer_id,
1520                "GRAFT: Not subscribed to topics -  Sending PRUNE to peer"
1521            );
1522        }
1523        tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer");
1524    }
1525
1526    fn remove_peer_from_mesh(
1527        &mut self,
1528        peer_id: &PeerId,
1529        topic_hash: &TopicHash,
1530        backoff: Option<u64>,
1531        always_update_backoff: bool,
1532        reason: Churn,
1533    ) {
1534        let mut update_backoff = always_update_backoff;
1535        if let Some(peers) = self.mesh.get_mut(topic_hash) {
1536            // remove the peer if it exists in the mesh
1537            if peers.remove(peer_id) {
1538                tracing::debug!(
1539                    peer=%peer_id,
1540                    topic=%topic_hash,
1541                    "PRUNE: Removing peer from the mesh for topic"
1542                );
1543                if let Some(m) = self.metrics.as_mut() {
1544                    m.peers_removed(topic_hash, reason, 1)
1545                }
1546
1547                if let Some((peer_score, ..)) = &mut self.peer_score {
1548                    peer_score.prune(peer_id, topic_hash.clone());
1549                }
1550
1551                update_backoff = true;
1552
1553                // inform the handler
1554                peer_removed_from_mesh(
1555                    *peer_id,
1556                    topic_hash,
1557                    &self.mesh,
1558                    &mut self.events,
1559                    &self.connected_peers,
1560                );
1561            }
1562        }
1563        if update_backoff {
1564            let time = if let Some(backoff) = backoff {
1565                Duration::from_secs(backoff)
1566            } else {
1567                self.config.prune_backoff()
1568            };
1569            // is there a backoff specified by the peer? if so obey it.
1570            self.backoffs.update_backoff(topic_hash, peer_id, time);
1571        }
1572    }
1573
1574    /// Handles PRUNE control messages. Removes peer from the mesh.
1575    fn handle_prune(
1576        &mut self,
1577        peer_id: &PeerId,
1578        prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
1579    ) {
1580        tracing::debug!(peer=%peer_id, "Handling PRUNE message for peer");
1581        let (below_threshold, score) =
1582            self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold);
1583        for (topic_hash, px, backoff) in prune_data {
1584            self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true, Churn::Prune);
1585
1586            if self.mesh.contains_key(&topic_hash) {
1587                // connect to px peers
1588                if !px.is_empty() {
1589                    // we ignore PX from peers with insufficient score
1590                    if below_threshold {
1591                        tracing::debug!(
1592                            peer=%peer_id,
1593                            %score,
1594                            topic=%topic_hash,
1595                            "PRUNE: ignoring PX from peer with insufficient score"
1596                        );
1597                        continue;
1598                    }
1599
1600                    // NOTE: We cannot dial any peers from PX currently as we typically will not
1601                    // know their multiaddr. Until SignedRecords are spec'd this
1602                    // remains a stub. By default `config.prune_peers()` is set to zero and
1603                    // this is skipped. If the user modifies this, this will only be able to
1604                    // dial already known peers (from an external discovery mechanism for
1605                    // example).
1606                    if self.config.prune_peers() > 0 {
1607                        self.px_connect(px);
1608                    }
1609                }
1610            }
1611        }
1612        tracing::debug!(peer=%peer_id, "Completed PRUNE handling for peer");
1613    }
1614
1615    fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
1616        let n = self.config.prune_peers();
1617        // Ignore peerInfo with no ID
1618        //
1619        // TODO: Once signed records are spec'd: Can we use peerInfo without any IDs if they have a
1620        // signed peer record?
1621        px.retain(|p| p.peer_id.is_some());
1622        if px.len() > n {
1623            // only use at most prune_peers many random peers
1624            let mut rng = thread_rng();
1625            px.partial_shuffle(&mut rng, n);
1626            px = px.into_iter().take(n).collect();
1627        }
1628
1629        for p in px {
1630            // TODO: Once signed records are spec'd: extract signed peer record if given and handle
1631            // it, see https://github.com/libp2p/specs/pull/217
1632            if let Some(peer_id) = p.peer_id {
1633                // mark as px peer
1634                self.px_peers.insert(peer_id);
1635
1636                // dial peer
1637                self.events.push_back(ToSwarm::Dial {
1638                    opts: DialOpts::peer_id(peer_id).build(),
1639                });
1640            }
1641        }
1642    }
1643
1644    /// Applies some basic checks to whether this message is valid. Does not apply user validation
1645    /// checks.
1646    fn message_is_valid(
1647        &mut self,
1648        msg_id: &MessageId,
1649        raw_message: &mut RawMessage,
1650        propagation_source: &PeerId,
1651    ) -> bool {
1652        tracing::debug!(
1653            peer=%propagation_source,
1654            message=%msg_id,
1655            "Handling message from peer"
1656        );
1657
1658        // Reject any message from a blacklisted peer
1659        if self.blacklisted_peers.contains(propagation_source) {
1660            tracing::debug!(
1661                peer=%propagation_source,
1662                "Rejecting message from blacklisted peer"
1663            );
1664            self.gossip_promises
1665                .reject_message(msg_id, &RejectReason::BlackListedPeer);
1666            if let Some((peer_score, ..)) = &mut self.peer_score {
1667                peer_score.reject_message(
1668                    propagation_source,
1669                    msg_id,
1670                    &raw_message.topic,
1671                    RejectReason::BlackListedPeer,
1672                );
1673            }
1674            return false;
1675        }
1676
1677        // Also reject any message that originated from a blacklisted peer
1678        if let Some(source) = raw_message.source.as_ref() {
1679            if self.blacklisted_peers.contains(source) {
1680                tracing::debug!(
1681                    peer=%propagation_source,
1682                    %source,
1683                    "Rejecting message from peer because of blacklisted source"
1684                );
1685                self.handle_invalid_message(
1686                    propagation_source,
1687                    raw_message,
1688                    RejectReason::BlackListedSource,
1689                );
1690                return false;
1691            }
1692        }
1693
1694        // If we are not validating messages, assume this message is validated
1695        // This will allow the message to be gossiped without explicitly calling
1696        // `validate_message`.
1697        if !self.config.validate_messages() {
1698            raw_message.validated = true;
1699        }
1700
1701        // reject messages claiming to be from ourselves but not locally published
1702        let self_published = !self.config.allow_self_origin()
1703            && if let Some(own_id) = self.publish_config.get_own_id() {
1704                own_id != propagation_source
1705                    && raw_message.source.as_ref().is_some_and(|s| s == own_id)
1706            } else {
1707                self.published_message_ids.contains(msg_id)
1708            };
1709
1710        if self_published {
1711            tracing::debug!(
1712                message=%msg_id,
1713                source=%propagation_source,
1714                "Dropping message claiming to be from self but forwarded from source"
1715            );
1716            self.handle_invalid_message(propagation_source, raw_message, RejectReason::SelfOrigin);
1717            return false;
1718        }
1719
1720        true
1721    }
1722
1723    /// Handles a newly received [`RawMessage`].
1724    ///
1725    /// Forwards the message to all peers in the mesh.
1726    fn handle_received_message(
1727        &mut self,
1728        mut raw_message: RawMessage,
1729        propagation_source: &PeerId,
1730    ) {
1731        // Record the received metric
1732        if let Some(metrics) = self.metrics.as_mut() {
1733            metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len());
1734        }
1735
1736        // Try and perform the data transform to the message. If it fails, consider it invalid.
1737        let message = match self.data_transform.inbound_transform(raw_message.clone()) {
1738            Ok(message) => message,
1739            Err(e) => {
1740                tracing::debug!("Invalid message. Transform error: {:?}", e);
1741                // Reject the message and return
1742                self.handle_invalid_message(
1743                    propagation_source,
1744                    &raw_message,
1745                    RejectReason::ValidationError(ValidationError::TransformFailed),
1746                );
1747                return;
1748            }
1749        };
1750
1751        // Calculate the message id on the transformed data.
1752        let msg_id = self.config.message_id(&message);
1753
1754        // Broadcast IDONTWANT messages
1755        if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() {
1756            self.send_idontwant(&raw_message, &msg_id, Some(propagation_source));
1757        }
1758
1759        // Check the validity of the message
1760        // Peers get penalized if this message is invalid. We don't add it to the duplicate cache
1761        // and instead continually penalize peers that repeatedly send this message.
1762        if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1763            return;
1764        }
1765
1766        if !self.duplicate_cache.insert(msg_id.clone()) {
1767            tracing::debug!(message=%msg_id, "Message already received, ignoring");
1768            if let Some((peer_score, ..)) = &mut self.peer_score {
1769                peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
1770            }
1771            self.mcache.observe_duplicate(&msg_id, propagation_source);
1772            return;
1773        }
1774
1775        tracing::debug!(
1776            message=%msg_id,
1777            "Put message in duplicate_cache and resolve promises"
1778        );
1779
1780        // Record the received message with the metrics
1781        if let Some(metrics) = self.metrics.as_mut() {
1782            metrics.msg_recvd(&message.topic);
1783        }
1784
1785        // Tells score that message arrived (but is maybe not fully validated yet).
1786        // Consider the message as delivered for gossip promises.
1787        self.gossip_promises.message_delivered(&msg_id);
1788
1789        // Tells score that message arrived (but is maybe not fully validated yet).
1790        if let Some((peer_score, ..)) = &mut self.peer_score {
1791            peer_score.validate_message(propagation_source, &msg_id, &message.topic);
1792        }
1793
1794        // Add the message to our memcache
1795        self.mcache.put(&msg_id, raw_message.clone());
1796
1797        // Dispatch the message to the user if we are subscribed to any of the topics
1798        if self.mesh.contains_key(&message.topic) {
1799            tracing::debug!("Sending received message to user");
1800            self.events
1801                .push_back(ToSwarm::GenerateEvent(Event::Message {
1802                    propagation_source: *propagation_source,
1803                    message_id: msg_id.clone(),
1804                    message,
1805                }));
1806        } else {
1807            tracing::debug!(
1808                topic=%message.topic,
1809                "Received message on a topic we are not subscribed to"
1810            );
1811            return;
1812        }
1813
1814        // forward the message to mesh peers, if no validation is required
1815        if !self.config.validate_messages() {
1816            self.forward_msg(
1817                &msg_id,
1818                raw_message,
1819                Some(propagation_source),
1820                HashSet::new(),
1821            );
1822            tracing::debug!(message=%msg_id, "Completed message handling for message");
1823        }
1824    }
1825
1826    // Handles invalid messages received.
1827    fn handle_invalid_message(
1828        &mut self,
1829        propagation_source: &PeerId,
1830        raw_message: &RawMessage,
1831        reject_reason: RejectReason,
1832    ) {
1833        if let Some(metrics) = self.metrics.as_mut() {
1834            metrics.register_invalid_message(&raw_message.topic);
1835        }
1836
1837        let message = self.data_transform.inbound_transform(raw_message.clone());
1838
1839        match (&mut self.peer_score, message) {
1840            (Some((peer_score, ..)), Ok(message)) => {
1841                let message_id = self.config.message_id(&message);
1842
1843                peer_score.reject_message(
1844                    propagation_source,
1845                    &message_id,
1846                    &message.topic,
1847                    reject_reason,
1848                );
1849
1850                self.gossip_promises
1851                    .reject_message(&message_id, &reject_reason);
1852            }
1853            (Some((peer_score, ..)), Err(_)) => {
1854                // The message is invalid, we reject it ignoring any gossip promises. If a peer is
1855                // advertising this message via an IHAVE and it's invalid it will be double
1856                // penalized, one for sending us an invalid and again for breaking a promise.
1857                peer_score.reject_invalid_message(propagation_source, &raw_message.topic);
1858            }
1859            (None, Ok(message)) => {
1860                // Valid transformation without peer scoring
1861                let message_id = self.config.message_id(&message);
1862                self.gossip_promises
1863                    .reject_message(&message_id, &reject_reason);
1864            }
1865            (None, Err(_)) => {}
1866        }
1867    }
1868
1869    /// Handles received subscriptions.
1870    fn handle_received_subscriptions(
1871        &mut self,
1872        subscriptions: &[Subscription],
1873        propagation_source: &PeerId,
1874    ) {
1875        tracing::debug!(
1876            source=%propagation_source,
1877            "Handling subscriptions: {:?}",
1878            subscriptions,
1879        );
1880
1881        let mut unsubscribed_peers = Vec::new();
1882
1883        let Some(peer) = self.connected_peers.get_mut(propagation_source) else {
1884            tracing::error!(
1885                peer=%propagation_source,
1886                "Subscription by unknown peer"
1887            );
1888            return;
1889        };
1890
1891        // Collect potential graft topics for the peer.
1892        let mut topics_to_graft = Vec::new();
1893
1894        // Notify the application about the subscription, after the grafts are sent.
1895        let mut application_event = Vec::new();
1896
1897        let filtered_topics = match self
1898            .subscription_filter
1899            .filter_incoming_subscriptions(subscriptions, &peer.topics)
1900        {
1901            Ok(topics) => topics,
1902            Err(s) => {
1903                tracing::error!(
1904                    peer=%propagation_source,
1905                    "Subscription filter error: {}; ignoring RPC from peer",
1906                    s
1907                );
1908                return;
1909            }
1910        };
1911
1912        for subscription in filtered_topics {
1913            // get the peers from the mapping, or insert empty lists if the topic doesn't exist
1914            let topic_hash = &subscription.topic_hash;
1915
1916            match subscription.action {
1917                SubscriptionAction::Subscribe => {
1918                    if peer.topics.insert(topic_hash.clone()) {
1919                        tracing::debug!(
1920                            peer=%propagation_source,
1921                            topic=%topic_hash,
1922                            "SUBSCRIPTION: Adding gossip peer to topic"
1923                        );
1924
1925                        if let Some(m) = self.metrics.as_mut() {
1926                            m.inc_topic_peers(topic_hash);
1927                        }
1928                    }
1929
1930                    // if the mesh needs peers add the peer to the mesh
1931                    if !self.explicit_peers.contains(propagation_source)
1932                        && peer.kind.is_gossipsub()
1933                        && !Self::score_below_threshold_from_scores(
1934                            &self.peer_score,
1935                            propagation_source,
1936                            |_| 0.0,
1937                        )
1938                        .0
1939                        && !self
1940                            .backoffs
1941                            .is_backoff_with_slack(topic_hash, propagation_source)
1942                    {
1943                        if let Some(peers) = self.mesh.get_mut(topic_hash) {
1944                            if peers.len() < self.config.mesh_n_low()
1945                                && peers.insert(*propagation_source)
1946                            {
1947                                tracing::debug!(
1948                                    peer=%propagation_source,
1949                                    topic=%topic_hash,
1950                                    "SUBSCRIPTION: Adding peer to the mesh for topic"
1951                                );
1952                                if let Some(m) = self.metrics.as_mut() {
1953                                    m.peers_included(topic_hash, Inclusion::Subscribed, 1)
1954                                }
1955                                // send graft to the peer
1956                                tracing::debug!(
1957                                    peer=%propagation_source,
1958                                    topic=%topic_hash,
1959                                    "Sending GRAFT to peer for topic"
1960                                );
1961                                if let Some((peer_score, ..)) = &mut self.peer_score {
1962                                    peer_score.graft(propagation_source, topic_hash.clone());
1963                                }
1964                                topics_to_graft.push(topic_hash.clone());
1965                            }
1966                        }
1967                    }
1968                    // generates a subscription event to be polled
1969                    application_event.push(ToSwarm::GenerateEvent(Event::Subscribed {
1970                        peer_id: *propagation_source,
1971                        topic: topic_hash.clone(),
1972                    }));
1973                }
1974                SubscriptionAction::Unsubscribe => {
1975                    if peer.topics.remove(topic_hash) {
1976                        tracing::debug!(
1977                            peer=%propagation_source,
1978                            topic=%topic_hash,
1979                            "SUBSCRIPTION: Removing gossip peer from topic"
1980                        );
1981
1982                        if let Some(m) = self.metrics.as_mut() {
1983                            m.dec_topic_peers(topic_hash);
1984                        }
1985                    }
1986
1987                    unsubscribed_peers.push((*propagation_source, topic_hash.clone()));
1988                    // generate an unsubscribe event to be polled
1989                    application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed {
1990                        peer_id: *propagation_source,
1991                        topic: topic_hash.clone(),
1992                    }));
1993                }
1994            }
1995        }
1996
1997        // remove unsubscribed peers from the mesh and fanout if they exist there.
1998        for (peer_id, topic_hash) in unsubscribed_peers {
1999            self.fanout
2000                .get_mut(&topic_hash)
2001                .map(|peers| peers.remove(&peer_id));
2002            self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false, Churn::Unsub);
2003        }
2004
2005        // Potentially inform the handler if we have added this peer to a mesh for the first time.
2006        let topics_joined = topics_to_graft.iter().collect::<Vec<_>>();
2007        if !topics_joined.is_empty() {
2008            peer_added_to_mesh(
2009                *propagation_source,
2010                topics_joined,
2011                &self.mesh,
2012                &mut self.events,
2013                &self.connected_peers,
2014            );
2015        }
2016
2017        // If we need to send grafts to peer, do so immediately, rather than waiting for the
2018        // heartbeat.
2019        for topic_hash in topics_to_graft.into_iter() {
2020            self.send_message(*propagation_source, RpcOut::Graft(Graft { topic_hash }));
2021        }
2022
2023        // Notify the application of the subscriptions
2024        for event in application_event {
2025            self.events.push_back(event);
2026        }
2027
2028        tracing::trace!(
2029            source=%propagation_source,
2030            "Completed handling subscriptions from source"
2031        );
2032    }
2033
2034    /// Applies penalties to peers that did not respond to our IWANT requests.
2035    fn apply_iwant_penalties(&mut self) {
2036        if let Some((peer_score, ..)) = &mut self.peer_score {
2037            for (peer, count) in self.gossip_promises.get_broken_promises() {
2038                peer_score.add_penalty(&peer, count);
2039                if let Some(metrics) = self.metrics.as_mut() {
2040                    metrics.register_score_penalty(Penalty::BrokenPromise);
2041                }
2042            }
2043        }
2044    }
2045
2046    /// Heartbeat function which shifts the memcache and updates the mesh.
2047    fn heartbeat(&mut self) {
2048        tracing::debug!("Starting heartbeat");
2049        let start = Instant::now();
2050
2051        // Every heartbeat we sample the send queues to add to our metrics. We do this intentionally
2052        // before we add all the gossip from this heartbeat in order to gain a true measure of
2053        // steady-state size of the queues.
2054        if let Some(m) = &mut self.metrics {
2055            for sender_queue in self.connected_peers.values().map(|v| &v.sender) {
2056                m.observe_priority_queue_size(sender_queue.priority_queue_len());
2057                m.observe_non_priority_queue_size(sender_queue.non_priority_queue_len());
2058            }
2059        }
2060
2061        self.heartbeat_ticks += 1;
2062
2063        let mut to_graft = HashMap::new();
2064        let mut to_prune = HashMap::new();
2065        let mut no_px = HashSet::new();
2066
2067        // clean up expired backoffs
2068        self.backoffs.heartbeat();
2069
2070        // clean up ihave counters
2071        self.count_sent_iwant.clear();
2072        self.count_received_ihave.clear();
2073
2074        // apply iwant penalties
2075        self.apply_iwant_penalties();
2076
2077        // check connections to explicit peers
2078        if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
2079            for p in self.explicit_peers.clone() {
2080                self.check_explicit_peer_connection(&p);
2081            }
2082        }
2083
2084        // Cache the scores of all connected peers, and record metrics for current penalties.
2085        let mut scores = HashMap::with_capacity(self.connected_peers.len());
2086        if let Some((peer_score, ..)) = &self.peer_score {
2087            for peer_id in self.connected_peers.keys() {
2088                scores
2089                    .entry(peer_id)
2090                    .or_insert_with(|| peer_score.metric_score(peer_id, self.metrics.as_mut()));
2091            }
2092        }
2093
2094        // maintain the mesh for each topic
2095        for (topic_hash, peers) in self.mesh.iter_mut() {
2096            let explicit_peers = &self.explicit_peers;
2097            let backoffs = &self.backoffs;
2098            let outbound_peers = &self.outbound_peers;
2099
2100            // drop all peers with negative score, without PX
2101            // if there is at some point a stable retain method for BTreeSet the following can be
2102            // written more efficiently with retain.
2103            let mut to_remove_peers = Vec::new();
2104            for peer_id in peers.iter() {
2105                let peer_score = *scores.get(peer_id).unwrap_or(&0.0);
2106
2107                // Record the score per mesh
2108                if let Some(metrics) = self.metrics.as_mut() {
2109                    metrics.observe_mesh_peers_score(topic_hash, peer_score);
2110                }
2111
2112                if peer_score < 0.0 {
2113                    tracing::debug!(
2114                        peer=%peer_id,
2115                        score=%peer_score,
2116                        topic=%topic_hash,
2117                        "HEARTBEAT: Prune peer with negative score"
2118                    );
2119
2120                    let current_topic = to_prune.entry(*peer_id).or_insert_with(Vec::new);
2121                    current_topic.push(topic_hash.clone());
2122                    no_px.insert(*peer_id);
2123                    to_remove_peers.push(*peer_id);
2124                }
2125            }
2126
2127            if let Some(m) = self.metrics.as_mut() {
2128                m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len())
2129            }
2130
2131            for peer_id in to_remove_peers {
2132                peers.remove(&peer_id);
2133            }
2134
2135            // too little peers - add some
2136            if peers.len() < self.config.mesh_n_low() {
2137                tracing::debug!(
2138                    topic=%topic_hash,
2139                    "HEARTBEAT: Mesh low. Topic contains: {} needs: {}",
2140                    peers.len(),
2141                    self.config.mesh_n_low()
2142                );
2143                // not enough peers - get mesh_n - current_length more
2144                let desired_peers = self.config.mesh_n() - peers.len();
2145                let peer_list =
2146                    get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| {
2147                        !peers.contains(peer)
2148                            && !explicit_peers.contains(peer)
2149                            && !backoffs.is_backoff_with_slack(topic_hash, peer)
2150                            && *scores.get(peer).unwrap_or(&0.0) >= 0.0
2151                    });
2152                for peer in &peer_list {
2153                    let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2154                    current_topic.push(topic_hash.clone());
2155                }
2156                // update the mesh
2157                tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2158                if let Some(m) = self.metrics.as_mut() {
2159                    m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2160                }
2161                peers.extend(peer_list);
2162            }
2163
2164            // too many peers - remove some
2165            if peers.len() > self.config.mesh_n_high() {
2166                tracing::debug!(
2167                    topic=%topic_hash,
2168                    "HEARTBEAT: Mesh high. Topic contains: {} needs: {}",
2169                    peers.len(),
2170                    self.config.mesh_n_high()
2171                );
2172                let excess_peer_no = peers.len() - self.config.mesh_n();
2173
2174                // shuffle the peers and then sort by score ascending beginning with the worst
2175                let mut rng = thread_rng();
2176                let mut shuffled = peers.iter().copied().collect::<Vec<_>>();
2177                shuffled.shuffle(&mut rng);
2178                shuffled.sort_by(|p1, p2| {
2179                    let score_p1 = *scores.get(p1).unwrap_or(&0.0);
2180                    let score_p2 = *scores.get(p2).unwrap_or(&0.0);
2181
2182                    score_p1.partial_cmp(&score_p2).unwrap_or(Ordering::Equal)
2183                });
2184                // shuffle everything except the last retain_scores many peers (the best ones)
2185                shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
2186
2187                // count total number of outbound peers
2188                let mut outbound = {
2189                    let outbound_peers = &self.outbound_peers;
2190                    shuffled
2191                        .iter()
2192                        .filter(|p| outbound_peers.contains(*p))
2193                        .count()
2194                };
2195
2196                // remove the first excess_peer_no allowed (by outbound restrictions) peers adding
2197                // them to to_prune
2198                let mut removed = 0;
2199                for peer in shuffled {
2200                    if removed == excess_peer_no {
2201                        break;
2202                    }
2203                    if self.outbound_peers.contains(&peer) {
2204                        if outbound <= self.config.mesh_outbound_min() {
2205                            // do not remove anymore outbound peers
2206                            continue;
2207                        }
2208                        // an outbound peer gets removed
2209                        outbound -= 1;
2210                    }
2211
2212                    // remove the peer
2213                    peers.remove(&peer);
2214                    let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
2215                    current_topic.push(topic_hash.clone());
2216                    removed += 1;
2217                }
2218
2219                if let Some(m) = self.metrics.as_mut() {
2220                    m.peers_removed(topic_hash, Churn::Excess, removed)
2221                }
2222            }
2223
2224            // do we have enough outbound peers?
2225            if peers.len() >= self.config.mesh_n_low() {
2226                // count number of outbound peers we have
2227                let outbound = { peers.iter().filter(|p| outbound_peers.contains(*p)).count() };
2228
2229                // if we have not enough outbound peers, graft to some new outbound peers
2230                if outbound < self.config.mesh_outbound_min() {
2231                    let needed = self.config.mesh_outbound_min() - outbound;
2232                    let peer_list =
2233                        get_random_peers(&self.connected_peers, topic_hash, needed, |peer| {
2234                            !peers.contains(peer)
2235                                && !explicit_peers.contains(peer)
2236                                && !backoffs.is_backoff_with_slack(topic_hash, peer)
2237                                && *scores.get(peer).unwrap_or(&0.0) >= 0.0
2238                                && outbound_peers.contains(peer)
2239                        });
2240                    for peer in &peer_list {
2241                        let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2242                        current_topic.push(topic_hash.clone());
2243                    }
2244                    // update the mesh
2245                    tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2246                    if let Some(m) = self.metrics.as_mut() {
2247                        m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len())
2248                    }
2249                    peers.extend(peer_list);
2250                }
2251            }
2252
2253            // should we try to improve the mesh with opportunistic grafting?
2254            if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
2255                && peers.len() > 1
2256                && self.peer_score.is_some()
2257            {
2258                if let Some((_, thresholds, _)) = &self.peer_score {
2259                    // Opportunistic grafting works as follows: we check the median score of peers
2260                    // in the mesh; if this score is below the opportunisticGraftThreshold, we
2261                    // select a few peers at random with score over the median.
2262                    // The intention is to (slowly) improve an underperforming mesh by introducing
2263                    // good scoring peers that may have been gossiping at us. This allows us to
2264                    // get out of sticky situations where we are stuck with poor peers and also
2265                    // recover from churn of good peers.
2266
2267                    // now compute the median peer score in the mesh
2268                    let mut peers_by_score: Vec<_> = peers.iter().collect();
2269                    peers_by_score.sort_by(|p1, p2| {
2270                        let p1_score = *scores.get(p1).unwrap_or(&0.0);
2271                        let p2_score = *scores.get(p2).unwrap_or(&0.0);
2272                        p1_score.partial_cmp(&p2_score).unwrap_or(Equal)
2273                    });
2274
2275                    let middle = peers_by_score.len() / 2;
2276                    let median = if peers_by_score.len() % 2 == 0 {
2277                        let sub_middle_peer = *peers_by_score
2278                            .get(middle - 1)
2279                            .expect("middle < vector length and middle > 0 since peers.len() > 0");
2280                        let sub_middle_score = *scores.get(sub_middle_peer).unwrap_or(&0.0);
2281                        let middle_peer =
2282                            *peers_by_score.get(middle).expect("middle < vector length");
2283                        let middle_score = *scores.get(middle_peer).unwrap_or(&0.0);
2284
2285                        (sub_middle_score + middle_score) * 0.5
2286                    } else {
2287                        *scores
2288                            .get(*peers_by_score.get(middle).expect("middle < vector length"))
2289                            .unwrap_or(&0.0)
2290                    };
2291
2292                    // if the median score is below the threshold, select a better peer (if any) and
2293                    // GRAFT
2294                    if median < thresholds.opportunistic_graft_threshold {
2295                        let peer_list = get_random_peers(
2296                            &self.connected_peers,
2297                            topic_hash,
2298                            self.config.opportunistic_graft_peers(),
2299                            |peer_id| {
2300                                !peers.contains(peer_id)
2301                                    && !explicit_peers.contains(peer_id)
2302                                    && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2303                                    && *scores.get(peer_id).unwrap_or(&0.0) > median
2304                            },
2305                        );
2306                        for peer in &peer_list {
2307                            let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2308                            current_topic.push(topic_hash.clone());
2309                        }
2310                        // update the mesh
2311                        tracing::debug!(
2312                            topic=%topic_hash,
2313                            "Opportunistically graft in topic with peers {:?}",
2314                            peer_list
2315                        );
2316                        if let Some(m) = self.metrics.as_mut() {
2317                            m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2318                        }
2319                        peers.extend(peer_list);
2320                    }
2321                }
2322            }
2323            // Register the final count of peers in the mesh
2324            if let Some(m) = self.metrics.as_mut() {
2325                m.set_mesh_peers(topic_hash, peers.len())
2326            }
2327        }
2328
2329        // remove expired fanout topics
2330        {
2331            let fanout = &mut self.fanout; // help the borrow checker
2332            let fanout_ttl = self.config.fanout_ttl();
2333            self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
2334                if *last_pub_time + fanout_ttl < Instant::now() {
2335                    tracing::debug!(
2336                        topic=%topic_hash,
2337                        "HEARTBEAT: Fanout topic removed due to timeout"
2338                    );
2339                    fanout.remove(topic_hash);
2340                    return false;
2341                }
2342                true
2343            });
2344        }
2345
2346        // maintain fanout
2347        // check if our peers are still a part of the topic
2348        for (topic_hash, peers) in self.fanout.iter_mut() {
2349            let mut to_remove_peers = Vec::new();
2350            let publish_threshold = match &self.peer_score {
2351                Some((_, thresholds, _)) => thresholds.publish_threshold,
2352                _ => 0.0,
2353            };
2354            for peer_id in peers.iter() {
2355                // is the peer still subscribed to the topic?
2356                let peer_score = *scores.get(peer_id).unwrap_or(&0.0);
2357                match self.connected_peers.get(peer_id) {
2358                    Some(peer) => {
2359                        if !peer.topics.contains(topic_hash) || peer_score < publish_threshold {
2360                            tracing::debug!(
2361                                topic=%topic_hash,
2362                                "HEARTBEAT: Peer removed from fanout for topic"
2363                            );
2364                            to_remove_peers.push(*peer_id);
2365                        }
2366                    }
2367                    None => {
2368                        // remove if the peer has disconnected
2369                        to_remove_peers.push(*peer_id);
2370                    }
2371                }
2372            }
2373            for to_remove in to_remove_peers {
2374                peers.remove(&to_remove);
2375            }
2376
2377            // not enough peers
2378            if peers.len() < self.config.mesh_n() {
2379                tracing::debug!(
2380                    "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
2381                    peers.len(),
2382                    self.config.mesh_n()
2383                );
2384                let needed_peers = self.config.mesh_n() - peers.len();
2385                let explicit_peers = &self.explicit_peers;
2386                let new_peers =
2387                    get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| {
2388                        !peers.contains(peer_id)
2389                            && !explicit_peers.contains(peer_id)
2390                            && *scores.get(peer_id).unwrap_or(&0.0) < publish_threshold
2391                    });
2392                peers.extend(new_peers);
2393            }
2394        }
2395
2396        if self.peer_score.is_some() {
2397            tracing::trace!("Mesh message deliveries: {:?}", {
2398                self.mesh
2399                    .iter()
2400                    .map(|(t, peers)| {
2401                        (
2402                            t.clone(),
2403                            peers
2404                                .iter()
2405                                .map(|p| {
2406                                    (
2407                                        *p,
2408                                        self.peer_score
2409                                            .as_ref()
2410                                            .expect("peer_score.is_some()")
2411                                            .0
2412                                            .mesh_message_deliveries(p, t)
2413                                            .unwrap_or(0.0),
2414                                    )
2415                                })
2416                                .collect::<HashMap<PeerId, f64>>(),
2417                        )
2418                    })
2419                    .collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
2420            })
2421        }
2422
2423        self.emit_gossip();
2424
2425        // send graft/prunes
2426        if !to_graft.is_empty() | !to_prune.is_empty() {
2427            self.send_graft_prune(to_graft, to_prune, no_px);
2428        }
2429
2430        // shift the memcache
2431        self.mcache.shift();
2432
2433        // Report expired messages
2434        for (peer_id, failed_messages) in self.failed_messages.drain() {
2435            tracing::debug!("Peer couldn't consume messages: {:?}", failed_messages);
2436            self.events
2437                .push_back(ToSwarm::GenerateEvent(Event::SlowPeer {
2438                    peer_id,
2439                    failed_messages,
2440                }));
2441        }
2442        self.failed_messages.shrink_to_fit();
2443
2444        // Flush stale IDONTWANTs.
2445        for peer in self.connected_peers.values_mut() {
2446            while let Some((_front, instant)) = peer.dont_send.front() {
2447                if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
2448                    break;
2449                } else {
2450                    peer.dont_send.pop_front();
2451                }
2452            }
2453        }
2454
2455        tracing::debug!("Completed Heartbeat");
2456        if let Some(metrics) = self.metrics.as_mut() {
2457            let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
2458            metrics.observe_heartbeat_duration(duration);
2459        }
2460    }
2461
2462    /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh
2463    /// and fanout peers
2464    fn emit_gossip(&mut self) {
2465        let mut rng = thread_rng();
2466        let mut messages = Vec::new();
2467        for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
2468            let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash);
2469            if message_ids.is_empty() {
2470                continue;
2471            }
2472
2473            // if we are emitting more than GossipSubMaxIHaveLength message_ids, truncate the list
2474            if message_ids.len() > self.config.max_ihave_length() {
2475                // we do the truncation (with shuffling) per peer below
2476                tracing::debug!(
2477                    "too many messages for gossip; will truncate IHAVE list ({} messages)",
2478                    message_ids.len()
2479                );
2480            } else {
2481                // shuffle to emit in random order
2482                message_ids.shuffle(&mut rng);
2483            }
2484
2485            // dynamic number of peers to gossip based on `gossip_factor` with minimum `gossip_lazy`
2486            let n_map = |m| {
2487                max(
2488                    self.config.gossip_lazy(),
2489                    (self.config.gossip_factor() * m as f64) as usize,
2490                )
2491            };
2492            // get gossip_lazy random peers
2493            let to_msg_peers =
2494                get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| {
2495                    !peers.contains(peer)
2496                        && !self.explicit_peers.contains(peer)
2497                        && !self.score_below_threshold(peer, |ts| ts.gossip_threshold).0
2498                });
2499
2500            tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());
2501
2502            for peer_id in to_msg_peers {
2503                let mut peer_message_ids = message_ids.clone();
2504
2505                if peer_message_ids.len() > self.config.max_ihave_length() {
2506                    // We do this per peer so that we emit a different set for each peer.
2507                    // we have enough redundancy in the system that this will significantly increase
2508                    // the message coverage when we do truncate.
2509                    peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
2510                    peer_message_ids.truncate(self.config.max_ihave_length());
2511                }
2512
2513                // send an IHAVE message
2514                messages.push((
2515                    peer_id,
2516                    RpcOut::IHave(IHave {
2517                        topic_hash: topic_hash.clone(),
2518                        message_ids: peer_message_ids,
2519                    }),
2520                ));
2521            }
2522        }
2523        for (peer_id, message) in messages {
2524            self.send_message(peer_id, message);
2525        }
2526    }
2527
2528    /// Handles multiple GRAFT/PRUNE messages and coalesces them into chunked gossip control
2529    /// messages.
2530    fn send_graft_prune(
2531        &mut self,
2532        to_graft: HashMap<PeerId, Vec<TopicHash>>,
2533        mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
2534        no_px: HashSet<PeerId>,
2535    ) {
2536        // handle the grafts and overlapping prunes per peer
2537        for (peer_id, topics) in to_graft.into_iter() {
2538            for topic in &topics {
2539                // inform scoring of graft
2540                if let Some((peer_score, ..)) = &mut self.peer_score {
2541                    peer_score.graft(&peer_id, topic.clone());
2542                }
2543
2544                // inform the handler of the peer being added to the mesh
2545                // If the peer did not previously exist in any mesh, inform the handler
2546                peer_added_to_mesh(
2547                    peer_id,
2548                    vec![topic],
2549                    &self.mesh,
2550                    &mut self.events,
2551                    &self.connected_peers,
2552                );
2553            }
2554            let rpc_msgs = topics.iter().map(|topic_hash| {
2555                RpcOut::Graft(Graft {
2556                    topic_hash: topic_hash.clone(),
2557                })
2558            });
2559
2560            // If there are prunes associated with the same peer add them.
2561            // NOTE: In this case a peer has been added to a topic mesh, and removed from another.
2562            // It therefore must be in at least one mesh and we do not need to inform the handler
2563            // of its removal from another.
2564
2565            // The following prunes are not due to unsubscribing.
2566            let prune_msgs = to_prune
2567                .remove(&peer_id)
2568                .into_iter()
2569                .flatten()
2570                .map(|topic_hash| {
2571                    let prune = self.make_prune(
2572                        &topic_hash,
2573                        &peer_id,
2574                        self.config.do_px() && !no_px.contains(&peer_id),
2575                        false,
2576                    );
2577                    RpcOut::Prune(prune)
2578                });
2579
2580            // send the rpc messages
2581            for msg in rpc_msgs.chain(prune_msgs).collect::<Vec<_>>() {
2582                self.send_message(peer_id, msg);
2583            }
2584        }
2585
2586        // handle the remaining prunes
2587        // The following prunes are not due to unsubscribing.
2588        for (peer_id, topics) in to_prune.iter() {
2589            for topic_hash in topics {
2590                let prune = self.make_prune(
2591                    topic_hash,
2592                    peer_id,
2593                    self.config.do_px() && !no_px.contains(peer_id),
2594                    false,
2595                );
2596                self.send_message(*peer_id, RpcOut::Prune(prune));
2597
2598                // inform the handler
2599                peer_removed_from_mesh(
2600                    *peer_id,
2601                    topic_hash,
2602                    &self.mesh,
2603                    &mut self.events,
2604                    &self.connected_peers,
2605                );
2606            }
2607        }
2608    }
2609
2610    /// Helper function which sends an IDONTWANT message to mesh\[topic\] peers.
2611    fn send_idontwant(
2612        &mut self,
2613        message: &RawMessage,
2614        msg_id: &MessageId,
2615        propagation_source: Option<&PeerId>,
2616    ) {
2617        let Some(mesh_peers) = self.mesh.get(&message.topic) else {
2618            return;
2619        };
2620
2621        let iwant_peers = self.gossip_promises.peers_for_message(msg_id);
2622
2623        let recipient_peers: Vec<PeerId> = mesh_peers
2624            .iter()
2625            .chain(iwant_peers.iter())
2626            .filter(|&peer_id| {
2627                Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref()
2628            })
2629            .cloned()
2630            .collect();
2631
2632        for peer_id in recipient_peers {
2633            let Some(peer) = self.connected_peers.get_mut(&peer_id) else {
2634                tracing::error!(peer = %peer_id,
2635                    "Could not IDONTWANT, peer doesn't exist in connected peer list");
2636                continue;
2637            };
2638
2639            // Only gossipsub 1.2 peers support IDONTWANT.
2640            if peer.kind != PeerKind::Gossipsubv1_2 {
2641                continue;
2642            }
2643
2644            self.send_message(
2645                peer_id,
2646                RpcOut::IDontWant(IDontWant {
2647                    message_ids: vec![msg_id.clone()],
2648                }),
2649            );
2650        }
2651    }
2652
2653    /// Helper function which forwards a message to mesh\[topic\] peers.
2654    ///
2655    /// Returns true if at least one peer was messaged.
2656    fn forward_msg(
2657        &mut self,
2658        msg_id: &MessageId,
2659        message: RawMessage,
2660        propagation_source: Option<&PeerId>,
2661        originating_peers: HashSet<PeerId>,
2662    ) -> bool {
2663        // message is fully validated inform peer_score
2664        if let Some((peer_score, ..)) = &mut self.peer_score {
2665            if let Some(peer) = propagation_source {
2666                peer_score.deliver_message(peer, msg_id, &message.topic);
2667            }
2668        }
2669
2670        tracing::debug!(message=%msg_id, "Forwarding message");
2671        let mut recipient_peers = HashSet::new();
2672
2673        // Populate the recipient peers mapping
2674
2675        // Add explicit peers
2676        for peer_id in &self.explicit_peers {
2677            let Some(peer) = self.connected_peers.get(peer_id) else {
2678                continue;
2679            };
2680            if Some(peer_id) != propagation_source
2681                && !originating_peers.contains(peer_id)
2682                && Some(peer_id) != message.source.as_ref()
2683                && peer.topics.contains(&message.topic)
2684            {
2685                recipient_peers.insert(*peer_id);
2686            }
2687        }
2688
2689        // add mesh peers
2690        let topic = &message.topic;
2691        // mesh
2692        if let Some(mesh_peers) = self.mesh.get(topic) {
2693            for peer_id in mesh_peers {
2694                if Some(peer_id) != propagation_source
2695                    && !originating_peers.contains(peer_id)
2696                    && Some(peer_id) != message.source.as_ref()
2697                {
2698                    recipient_peers.insert(*peer_id);
2699                }
2700            }
2701        }
2702
2703        if recipient_peers.is_empty() {
2704            return false;
2705        }
2706
2707        // forward the message to peers
2708        for peer_id in recipient_peers.iter() {
2709            if let Some(peer) = self.connected_peers.get_mut(peer_id) {
2710                if peer.dont_send.contains_key(msg_id) {
2711                    tracing::debug!(%peer_id, message=%msg_id, "Peer doesn't want message");
2712                    continue;
2713                }
2714
2715                tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer");
2716
2717                self.send_message(
2718                    *peer_id,
2719                    RpcOut::Forward {
2720                        message: message.clone(),
2721                        timeout: Delay::new(self.config.forward_queue_duration()),
2722                    },
2723                );
2724            }
2725        }
2726        tracing::debug!("Completed forwarding message");
2727        true
2728    }
2729
2730    /// Constructs a [`RawMessage`] performing message signing if required.
2731    pub(crate) fn build_raw_message(
2732        &mut self,
2733        topic: TopicHash,
2734        data: Vec<u8>,
2735    ) -> Result<RawMessage, PublishError> {
2736        match &mut self.publish_config {
2737            PublishConfig::Signing {
2738                ref keypair,
2739                author,
2740                inline_key,
2741                last_seq_no,
2742            } => {
2743                let sequence_number = last_seq_no.next();
2744
2745                let signature = {
2746                    let message = proto::Message {
2747                        from: Some(author.to_bytes()),
2748                        data: Some(data.clone()),
2749                        seqno: Some(sequence_number.to_be_bytes().to_vec()),
2750                        topic: topic.clone().into_string(),
2751                        signature: None,
2752                        key: None,
2753                    };
2754
2755                    let mut buf = Vec::with_capacity(message.get_size());
2756                    let mut writer = Writer::new(&mut buf);
2757
2758                    message
2759                        .write_message(&mut writer)
2760                        .expect("Encoding to succeed");
2761
2762                    // the signature is over the bytes "libp2p-pubsub:<protobuf-message>"
2763                    let mut signature_bytes = SIGNING_PREFIX.to_vec();
2764                    signature_bytes.extend_from_slice(&buf);
2765                    Some(keypair.sign(&signature_bytes)?)
2766                };
2767
2768                Ok(RawMessage {
2769                    source: Some(*author),
2770                    data,
2771                    // To be interoperable with the go-implementation this is treated as a 64-bit
2772                    // big-endian uint.
2773                    sequence_number: Some(sequence_number),
2774                    topic,
2775                    signature,
2776                    key: inline_key.clone(),
2777                    validated: true, // all published messages are valid
2778                })
2779            }
2780            PublishConfig::Author(peer_id) => {
2781                Ok(RawMessage {
2782                    source: Some(*peer_id),
2783                    data,
2784                    // To be interoperable with the go-implementation this is treated as a 64-bit
2785                    // big-endian uint.
2786                    sequence_number: Some(rand::random()),
2787                    topic,
2788                    signature: None,
2789                    key: None,
2790                    validated: true, // all published messages are valid
2791                })
2792            }
2793            PublishConfig::RandomAuthor => {
2794                Ok(RawMessage {
2795                    source: Some(PeerId::random()),
2796                    data,
2797                    // To be interoperable with the go-implementation this is treated as a 64-bit
2798                    // big-endian uint.
2799                    sequence_number: Some(rand::random()),
2800                    topic,
2801                    signature: None,
2802                    key: None,
2803                    validated: true, // all published messages are valid
2804                })
2805            }
2806            PublishConfig::Anonymous => {
2807                Ok(RawMessage {
2808                    source: None,
2809                    data,
2810                    // To be interoperable with the go-implementation this is treated as a 64-bit
2811                    // big-endian uint.
2812                    sequence_number: None,
2813                    topic,
2814                    signature: None,
2815                    key: None,
2816                    validated: true, // all published messages are valid
2817                })
2818            }
2819        }
2820    }
2821
2822    /// Send a [`RpcOut`] message to a peer.
2823    ///
2824    /// Returns `true` if sending was successful, `false` otherwise.
2825    /// The method will update the peer score and failed message counter if
2826    /// sending the message failed due to the channel to the connection handler being
2827    /// full (which indicates a slow peer).
2828    fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool {
2829        if let Some(m) = self.metrics.as_mut() {
2830            if let RpcOut::Publish { ref message, .. } | RpcOut::Forward { ref message, .. } = rpc {
2831                // register bytes sent on the internal metrics.
2832                m.msg_sent(&message.topic, message.raw_protobuf_len());
2833            }
2834        }
2835
2836        let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) else {
2837            tracing::error!(peer = %peer_id,
2838                    "Could not send rpc to connection handler, peer doesn't exist in connected peer list");
2839            return false;
2840        };
2841
2842        // Try sending the message to the connection handler.
2843        match peer.sender.send_message(rpc) {
2844            Ok(()) => true,
2845            Err(rpc) => {
2846                // Sending failed because the channel is full.
2847                tracing::warn!(peer=%peer_id, "Send Queue full. Could not send {:?}.", rpc);
2848
2849                // Update failed message counter.
2850                let failed_messages = self.failed_messages.entry(peer_id).or_default();
2851                match rpc {
2852                    RpcOut::Publish { .. } => {
2853                        failed_messages.priority += 1;
2854                        failed_messages.publish += 1;
2855                    }
2856                    RpcOut::Forward { .. } => {
2857                        failed_messages.non_priority += 1;
2858                        failed_messages.forward += 1;
2859                    }
2860                    RpcOut::IWant(_) | RpcOut::IHave(_) | RpcOut::IDontWant(_) => {
2861                        failed_messages.non_priority += 1;
2862                    }
2863                    RpcOut::Graft(_)
2864                    | RpcOut::Prune(_)
2865                    | RpcOut::Subscribe(_)
2866                    | RpcOut::Unsubscribe(_) => {
2867                        unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
2868                    }
2869                }
2870
2871                // Update peer score.
2872                if let Some((peer_score, ..)) = &mut self.peer_score {
2873                    peer_score.failed_message_slow_peer(&peer_id);
2874                }
2875
2876                false
2877            }
2878        }
2879    }
2880
2881    fn on_connection_established(
2882        &mut self,
2883        ConnectionEstablished {
2884            peer_id,
2885            endpoint,
2886            other_established,
2887            ..
2888        }: ConnectionEstablished,
2889    ) {
2890        // Diverging from the go implementation we only want to consider a peer as outbound peer
2891        // if its first connection is outbound.
2892
2893        if endpoint.is_dialer() && other_established == 0 && !self.px_peers.contains(&peer_id) {
2894            // The first connection is outbound and it is not a peer from peer exchange => mark
2895            // it as outbound peer
2896            self.outbound_peers.insert(peer_id);
2897        }
2898
2899        // Add the IP to the peer scoring system
2900        if let Some((peer_score, ..)) = &mut self.peer_score {
2901            if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2902                peer_score.add_ip(&peer_id, ip);
2903            } else {
2904                tracing::trace!(
2905                    peer=%peer_id,
2906                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2907                    endpoint
2908                )
2909            }
2910        }
2911
2912        if other_established > 0 {
2913            return; // Not our first connection to this peer, hence nothing to do.
2914        }
2915
2916        if let Some((peer_score, ..)) = &mut self.peer_score {
2917            peer_score.add_peer(peer_id);
2918        }
2919
2920        // Ignore connections from blacklisted peers.
2921        if self.blacklisted_peers.contains(&peer_id) {
2922            tracing::debug!(peer=%peer_id, "Ignoring connection from blacklisted peer");
2923            return;
2924        }
2925
2926        tracing::debug!(peer=%peer_id, "New peer connected");
2927        // We need to send our subscriptions to the newly-connected node.
2928        for topic_hash in self.mesh.clone().into_keys() {
2929            self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
2930        }
2931    }
2932
2933    fn on_connection_closed(
2934        &mut self,
2935        ConnectionClosed {
2936            peer_id,
2937            connection_id,
2938            endpoint,
2939            remaining_established,
2940            ..
2941        }: ConnectionClosed,
2942    ) {
2943        // Remove IP from peer scoring system
2944        if let Some((peer_score, ..)) = &mut self.peer_score {
2945            if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2946                peer_score.remove_ip(&peer_id, &ip);
2947            } else {
2948                tracing::trace!(
2949                    peer=%peer_id,
2950                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2951                    endpoint
2952                )
2953            }
2954        }
2955
2956        if remaining_established != 0 {
2957            // Remove the connection from the list
2958            if let Some(peer) = self.connected_peers.get_mut(&peer_id) {
2959                let index = peer
2960                    .connections
2961                    .iter()
2962                    .position(|v| v == &connection_id)
2963                    .expect("Previously established connection to peer must be present");
2964                peer.connections.remove(index);
2965
2966                // If there are more connections and this peer is in a mesh, inform the first
2967                // connection handler.
2968                if !peer.connections.is_empty() {
2969                    for topic in &peer.topics {
2970                        if let Some(mesh_peers) = self.mesh.get(topic) {
2971                            if mesh_peers.contains(&peer_id) {
2972                                self.events.push_back(ToSwarm::NotifyHandler {
2973                                    peer_id,
2974                                    event: HandlerIn::JoinedMesh,
2975                                    handler: NotifyHandler::One(peer.connections[0]),
2976                                });
2977                                break;
2978                            }
2979                        }
2980                    }
2981                }
2982            }
2983        } else {
2984            // remove from mesh, topic_peers, peer_topic and the fanout
2985            tracing::debug!(peer=%peer_id, "Peer disconnected");
2986            let Some(connected_peer) = self.connected_peers.get(&peer_id) else {
2987                tracing::error!(peer_id = %peer_id, "Peer non-existent when handling disconnection");
2988                return;
2989            };
2990
2991            // remove peer from all mappings
2992            for topic in &connected_peer.topics {
2993                // check the mesh for the topic
2994                if let Some(mesh_peers) = self.mesh.get_mut(topic) {
2995                    // check if the peer is in the mesh and remove it
2996                    if mesh_peers.remove(&peer_id) {
2997                        if let Some(m) = self.metrics.as_mut() {
2998                            m.peers_removed(topic, Churn::Dc, 1);
2999                            m.set_mesh_peers(topic, mesh_peers.len());
3000                        }
3001                    };
3002                }
3003
3004                if let Some(m) = self.metrics.as_mut() {
3005                    m.dec_topic_peers(topic);
3006                }
3007
3008                // remove from fanout
3009                self.fanout
3010                    .get_mut(topic)
3011                    .map(|peers| peers.remove(&peer_id));
3012            }
3013
3014            // Forget px and outbound status for this peer
3015            self.px_peers.remove(&peer_id);
3016            self.outbound_peers.remove(&peer_id);
3017
3018            // If metrics are enabled, register the disconnection of a peer based on its protocol.
3019            if let Some(metrics) = self.metrics.as_mut() {
3020                metrics.peer_protocol_disconnected(connected_peer.kind);
3021            }
3022
3023            self.connected_peers.remove(&peer_id);
3024
3025            if let Some((peer_score, ..)) = &mut self.peer_score {
3026                peer_score.remove_peer(&peer_id);
3027            }
3028        }
3029    }
3030
3031    fn on_address_change(
3032        &mut self,
3033        AddressChange {
3034            peer_id,
3035            old: endpoint_old,
3036            new: endpoint_new,
3037            ..
3038        }: AddressChange,
3039    ) {
3040        // Exchange IP in peer scoring system
3041        if let Some((peer_score, ..)) = &mut self.peer_score {
3042            if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
3043                peer_score.remove_ip(&peer_id, &ip);
3044            } else {
3045                tracing::trace!(
3046                    peer=%&peer_id,
3047                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3048                    endpoint_old
3049                )
3050            }
3051            if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
3052                peer_score.add_ip(&peer_id, ip);
3053            } else {
3054                tracing::trace!(
3055                    peer=%peer_id,
3056                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3057                    endpoint_new
3058                )
3059            }
3060        }
3061    }
3062}
3063
3064fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
3065    addr.iter().find_map(|p| match p {
3066        Ip4(addr) => Some(IpAddr::V4(addr)),
3067        Ip6(addr) => Some(IpAddr::V6(addr)),
3068        _ => None,
3069    })
3070}
3071
3072impl<C, F> NetworkBehaviour for Behaviour<C, F>
3073where
3074    C: Send + 'static + DataTransform,
3075    F: Send + 'static + TopicSubscriptionFilter,
3076{
3077    type ConnectionHandler = Handler;
3078    type ToSwarm = Event;
3079
3080    fn handle_established_inbound_connection(
3081        &mut self,
3082        connection_id: ConnectionId,
3083        peer_id: PeerId,
3084        _: &Multiaddr,
3085        _: &Multiaddr,
3086    ) -> Result<THandler<Self>, ConnectionDenied> {
3087        // By default we assume a peer is only a floodsub peer.
3088        //
3089        // The protocol negotiation occurs once a message is sent/received. Once this happens we
3090        // update the type of peer that this is in order to determine which kind of routing should
3091        // occur.
3092        let connected_peer = self
3093            .connected_peers
3094            .entry(peer_id)
3095            .or_insert(PeerConnections {
3096                kind: PeerKind::Floodsub,
3097                connections: vec![],
3098                sender: Sender::new(self.config.connection_handler_queue_len()),
3099                topics: Default::default(),
3100                dont_send: LinkedHashMap::new(),
3101            });
3102        // Add the new connection
3103        connected_peer.connections.push(connection_id);
3104
3105        Ok(Handler::new(
3106            self.config.protocol_config(),
3107            connected_peer.sender.new_receiver(),
3108        ))
3109    }
3110
3111    fn handle_established_outbound_connection(
3112        &mut self,
3113        connection_id: ConnectionId,
3114        peer_id: PeerId,
3115        _: &Multiaddr,
3116        _: Endpoint,
3117        _: PortUse,
3118    ) -> Result<THandler<Self>, ConnectionDenied> {
3119        let connected_peer = self
3120            .connected_peers
3121            .entry(peer_id)
3122            .or_insert(PeerConnections {
3123                kind: PeerKind::Floodsub,
3124                connections: vec![],
3125                sender: Sender::new(self.config.connection_handler_queue_len()),
3126                topics: Default::default(),
3127                dont_send: LinkedHashMap::new(),
3128            });
3129        // Add the new connection
3130        connected_peer.connections.push(connection_id);
3131
3132        Ok(Handler::new(
3133            self.config.protocol_config(),
3134            connected_peer.sender.new_receiver(),
3135        ))
3136    }
3137
3138    fn on_connection_handler_event(
3139        &mut self,
3140        propagation_source: PeerId,
3141        _connection_id: ConnectionId,
3142        handler_event: THandlerOutEvent<Self>,
3143    ) {
3144        match handler_event {
3145            HandlerEvent::PeerKind(kind) => {
3146                // We have identified the protocol this peer is using
3147
3148                if let Some(metrics) = self.metrics.as_mut() {
3149                    metrics.peer_protocol_connected(kind);
3150                }
3151
3152                if let PeerKind::NotSupported = kind {
3153                    tracing::debug!(
3154                        peer=%propagation_source,
3155                        "Peer does not support gossipsub protocols"
3156                    );
3157                    self.events
3158                        .push_back(ToSwarm::GenerateEvent(Event::GossipsubNotSupported {
3159                            peer_id: propagation_source,
3160                        }));
3161                } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) {
3162                    // Only change the value if the old value is Floodsub (the default set in
3163                    // `NetworkBehaviour::on_event` with FromSwarm::ConnectionEstablished).
3164                    // All other PeerKind changes are ignored.
3165                    tracing::debug!(
3166                        peer=%propagation_source,
3167                        peer_type=%kind,
3168                        "New peer type found for peer"
3169                    );
3170                    if let PeerKind::Floodsub = conn.kind {
3171                        conn.kind = kind;
3172                    }
3173                }
3174            }
3175            HandlerEvent::MessageDropped(rpc) => {
3176                // Account for this in the scoring logic
3177                if let Some((peer_score, _, _)) = &mut self.peer_score {
3178                    peer_score.failed_message_slow_peer(&propagation_source);
3179                }
3180
3181                // Keep track of expired messages for the application layer.
3182                let failed_messages = self.failed_messages.entry(propagation_source).or_default();
3183                failed_messages.timeout += 1;
3184                match rpc {
3185                    RpcOut::Publish { .. } => {
3186                        failed_messages.publish += 1;
3187                    }
3188                    RpcOut::Forward { .. } => {
3189                        failed_messages.forward += 1;
3190                    }
3191                    _ => {}
3192                }
3193
3194                // Record metrics on the failure.
3195                if let Some(metrics) = self.metrics.as_mut() {
3196                    match rpc {
3197                        RpcOut::Publish { message, .. } => {
3198                            metrics.publish_msg_dropped(&message.topic);
3199                            metrics.timeout_msg_dropped(&message.topic);
3200                        }
3201                        RpcOut::Forward { message, .. } => {
3202                            metrics.forward_msg_dropped(&message.topic);
3203                            metrics.timeout_msg_dropped(&message.topic);
3204                        }
3205                        _ => {}
3206                    }
3207                }
3208            }
3209            HandlerEvent::Message {
3210                rpc,
3211                invalid_messages,
3212            } => {
3213                // Handle the gossipsub RPC
3214
3215                // Handle subscriptions
3216                // Update connected peers topics
3217                if !rpc.subscriptions.is_empty() {
3218                    self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
3219                }
3220
3221                // Check if peer is graylisted in which case we ignore the event
3222                if let (true, _) =
3223                    self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold)
3224                {
3225                    tracing::debug!(peer=%propagation_source, "RPC Dropped from greylisted peer");
3226                    return;
3227                }
3228
3229                // Handle any invalid messages from this peer
3230                if self.peer_score.is_some() {
3231                    for (raw_message, validation_error) in invalid_messages {
3232                        self.handle_invalid_message(
3233                            &propagation_source,
3234                            &raw_message,
3235                            RejectReason::ValidationError(validation_error),
3236                        )
3237                    }
3238                } else {
3239                    // log the invalid messages
3240                    for (message, validation_error) in invalid_messages {
3241                        tracing::warn!(
3242                            peer=%propagation_source,
3243                            source=?message.source,
3244                            "Invalid message from peer. Reason: {:?}",
3245                            validation_error,
3246                        );
3247                    }
3248                }
3249
3250                // Handle messages
3251                for (count, raw_message) in rpc.messages.into_iter().enumerate() {
3252                    // Only process the amount of messages the configuration allows.
3253                    if self.config.max_messages_per_rpc().is_some()
3254                        && Some(count) >= self.config.max_messages_per_rpc()
3255                    {
3256                        tracing::warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
3257                        break;
3258                    }
3259                    self.handle_received_message(raw_message, &propagation_source);
3260                }
3261
3262                // Handle control messages
3263                // group some control messages, this minimises SendEvents (code is simplified to
3264                // handle each event at a time however)
3265                let mut ihave_msgs = vec![];
3266                let mut graft_msgs = vec![];
3267                let mut prune_msgs = vec![];
3268                for control_msg in rpc.control_msgs {
3269                    match control_msg {
3270                        ControlAction::IHave(IHave {
3271                            topic_hash,
3272                            message_ids,
3273                        }) => {
3274                            ihave_msgs.push((topic_hash, message_ids));
3275                        }
3276                        ControlAction::IWant(IWant { message_ids }) => {
3277                            self.handle_iwant(&propagation_source, message_ids)
3278                        }
3279                        ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash),
3280                        ControlAction::Prune(Prune {
3281                            topic_hash,
3282                            peers,
3283                            backoff,
3284                        }) => prune_msgs.push((topic_hash, peers, backoff)),
3285                        ControlAction::IDontWant(IDontWant { message_ids }) => {
3286                            let Some(peer) = self.connected_peers.get_mut(&propagation_source)
3287                            else {
3288                                tracing::error!(peer = %propagation_source,
3289                                    "Could not handle IDONTWANT, peer doesn't exist in connected peer list");
3290                                continue;
3291                            };
3292                            if let Some(metrics) = self.metrics.as_mut() {
3293                                metrics.register_idontwant(message_ids.len());
3294                            }
3295                            for message_id in message_ids {
3296                                peer.dont_send.insert(message_id, Instant::now());
3297                                // Don't exceed capacity.
3298                                if peer.dont_send.len() > IDONTWANT_CAP {
3299                                    peer.dont_send.pop_front();
3300                                }
3301                            }
3302                        }
3303                    }
3304                }
3305                if !ihave_msgs.is_empty() {
3306                    self.handle_ihave(&propagation_source, ihave_msgs);
3307                }
3308                if !graft_msgs.is_empty() {
3309                    self.handle_graft(&propagation_source, graft_msgs);
3310                }
3311                if !prune_msgs.is_empty() {
3312                    self.handle_prune(&propagation_source, prune_msgs);
3313                }
3314            }
3315        }
3316    }
3317
3318    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
3319    fn poll(
3320        &mut self,
3321        cx: &mut Context<'_>,
3322    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
3323        if let Some(event) = self.events.pop_front() {
3324            return Poll::Ready(event);
3325        }
3326
3327        // update scores
3328        if let Some((peer_score, _, delay)) = &mut self.peer_score {
3329            if delay.poll_unpin(cx).is_ready() {
3330                peer_score.refresh_scores();
3331                delay.reset(peer_score.params.decay_interval);
3332            }
3333        }
3334
3335        if self.heartbeat.poll_unpin(cx).is_ready() {
3336            self.heartbeat();
3337            self.heartbeat.reset(self.config.heartbeat_interval());
3338        }
3339
3340        Poll::Pending
3341    }
3342
3343    fn on_swarm_event(&mut self, event: FromSwarm) {
3344        match event {
3345            FromSwarm::ConnectionEstablished(connection_established) => {
3346                self.on_connection_established(connection_established)
3347            }
3348            FromSwarm::ConnectionClosed(connection_closed) => {
3349                self.on_connection_closed(connection_closed)
3350            }
3351            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
3352            _ => {}
3353        }
3354    }
3355}
3356
3357/// This is called when peers are added to any mesh. It checks if the peer existed
3358/// in any other mesh. If this is the first mesh they have joined, it queues a message to notify
3359/// the appropriate connection handler to maintain a connection.
3360fn peer_added_to_mesh(
3361    peer_id: PeerId,
3362    new_topics: Vec<&TopicHash>,
3363    mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3364    events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3365    connections: &HashMap<PeerId, PeerConnections>,
3366) {
3367    // Ensure there is an active connection
3368    let connection_id = match connections.get(&peer_id) {
3369        Some(p) => p
3370            .connections
3371            .first()
3372            .expect("There should be at least one connection to a peer."),
3373        None => {
3374            tracing::error!(peer_id=%peer_id, "Peer not existent when added to the mesh");
3375            return;
3376        }
3377    };
3378
3379    if let Some(peer) = connections.get(&peer_id) {
3380        for topic in &peer.topics {
3381            if !new_topics.contains(&topic) {
3382                if let Some(mesh_peers) = mesh.get(topic) {
3383                    if mesh_peers.contains(&peer_id) {
3384                        // the peer is already in a mesh for another topic
3385                        return;
3386                    }
3387                }
3388            }
3389        }
3390    }
3391    // This is the first mesh the peer has joined, inform the handler
3392    events.push_back(ToSwarm::NotifyHandler {
3393        peer_id,
3394        event: HandlerIn::JoinedMesh,
3395        handler: NotifyHandler::One(*connection_id),
3396    });
3397}
3398
3399/// This is called when peers are removed from a mesh. It checks if the peer exists
3400/// in any other mesh. If this is the last mesh they have joined, we return true, in order to
3401/// notify the handler to no longer maintain a connection.
3402fn peer_removed_from_mesh(
3403    peer_id: PeerId,
3404    old_topic: &TopicHash,
3405    mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3406    events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3407    connections: &HashMap<PeerId, PeerConnections>,
3408) {
3409    // Ensure there is an active connection
3410    let connection_id = match connections.get(&peer_id) {
3411        Some(p) => p
3412            .connections
3413            .first()
3414            .expect("There should be at least one connection to a peer."),
3415        None => {
3416            tracing::error!(peer_id=%peer_id, "Peer not existent when removed from mesh");
3417            return;
3418        }
3419    };
3420
3421    if let Some(peer) = connections.get(&peer_id) {
3422        for topic in &peer.topics {
3423            if topic != old_topic {
3424                if let Some(mesh_peers) = mesh.get(topic) {
3425                    if mesh_peers.contains(&peer_id) {
3426                        // the peer exists in another mesh still
3427                        return;
3428                    }
3429                }
3430            }
3431        }
3432    }
3433    // The peer is not in any other mesh, inform the handler
3434    events.push_back(ToSwarm::NotifyHandler {
3435        peer_id,
3436        event: HandlerIn::LeftMesh,
3437        handler: NotifyHandler::One(*connection_id),
3438    });
3439}
3440
3441/// Helper function to get a subset of random gossipsub peers for a `topic_hash`
3442/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
3443/// that gets as input the number of filtered peers.
3444fn get_random_peers_dynamic(
3445    connected_peers: &HashMap<PeerId, PeerConnections>,
3446    topic_hash: &TopicHash,
3447    // maps the number of total peers to the number of selected peers
3448    n_map: impl Fn(usize) -> usize,
3449    mut f: impl FnMut(&PeerId) -> bool,
3450) -> BTreeSet<PeerId> {
3451    let mut gossip_peers = connected_peers
3452        .iter()
3453        .filter(|(_, p)| p.topics.contains(topic_hash))
3454        .filter(|(peer_id, _)| f(peer_id))
3455        .filter(|(_, p)| p.kind.is_gossipsub())
3456        .map(|(peer_id, _)| *peer_id)
3457        .collect::<Vec<PeerId>>();
3458
3459    // if we have less than needed, return them
3460    let n = n_map(gossip_peers.len());
3461    if gossip_peers.len() <= n {
3462        tracing::debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
3463        return gossip_peers.into_iter().collect();
3464    }
3465
3466    // we have more peers than needed, shuffle them and return n of them
3467    let mut rng = thread_rng();
3468    gossip_peers.partial_shuffle(&mut rng, n);
3469
3470    tracing::debug!("RANDOM PEERS: Got {:?} peers", n);
3471
3472    gossip_peers.into_iter().take(n).collect()
3473}
3474
3475/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
3476/// filtered by the function `f`.
3477fn get_random_peers(
3478    connected_peers: &HashMap<PeerId, PeerConnections>,
3479    topic_hash: &TopicHash,
3480    n: usize,
3481    f: impl FnMut(&PeerId) -> bool,
3482) -> BTreeSet<PeerId> {
3483    get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f)
3484}
3485
3486/// Validates the combination of signing, privacy and message validation to ensure the
3487/// configuration will not reject published messages.
3488fn validate_config(
3489    authenticity: &MessageAuthenticity,
3490    validation_mode: &ValidationMode,
3491) -> Result<(), &'static str> {
3492    match validation_mode {
3493        ValidationMode::Anonymous => {
3494            if authenticity.is_signing() {
3495                return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
3496            }
3497
3498            if !authenticity.is_anonymous() {
3499                return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
3500            }
3501        }
3502        ValidationMode::Strict => {
3503            if !authenticity.is_signing() {
3504                return Err(
3505                    "Messages will be
3506                published unsigned and incoming unsigned messages will be rejected. Consider adjusting
3507                the validation or privacy settings in the config"
3508                );
3509            }
3510        }
3511        _ => {}
3512    }
3513    Ok(())
3514}
3515
3516impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F> {
3517    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3518        f.debug_struct("Behaviour")
3519            .field("config", &self.config)
3520            .field("events", &self.events.len())
3521            .field("publish_config", &self.publish_config)
3522            .field("mesh", &self.mesh)
3523            .field("fanout", &self.fanout)
3524            .field("fanout_last_pub", &self.fanout_last_pub)
3525            .field("mcache", &self.mcache)
3526            .field("heartbeat", &self.heartbeat)
3527            .finish()
3528    }
3529}
3530
3531impl fmt::Debug for PublishConfig {
3532    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3533        match self {
3534            PublishConfig::Signing { author, .. } => {
3535                f.write_fmt(format_args!("PublishConfig::Signing({author})"))
3536            }
3537            PublishConfig::Author(author) => {
3538                f.write_fmt(format_args!("PublishConfig::Author({author})"))
3539            }
3540            PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
3541            PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
3542        }
3543    }
3544}