1use std::{
22 cmp::{max, Ordering, Ordering::Equal},
23 collections::{BTreeSet, HashMap, HashSet, VecDeque},
24 fmt,
25 fmt::Debug,
26 net::IpAddr,
27 task::{Context, Poll},
28 time::Duration,
29};
30
31use futures::FutureExt;
32use futures_timer::Delay;
33use hashlink::LinkedHashMap;
34use libp2p_core::{
35 multiaddr::Protocol::{Ip4, Ip6},
36 transport::PortUse,
37 Endpoint, Multiaddr,
38};
39use libp2p_identity::{Keypair, PeerId};
40use libp2p_swarm::{
41 behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
42 dial_opts::DialOpts,
43 ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent,
44 THandlerOutEvent, ToSwarm,
45};
46use prometheus_client::registry::Registry;
47use quick_protobuf::{MessageWrite, Writer};
48use rand::{seq::SliceRandom, thread_rng};
49use web_time::{Instant, SystemTime};
50
51use crate::{
52 backoff::BackoffStorage,
53 config::{Config, ValidationMode},
54 gossip_promises::GossipPromises,
55 handler::{Handler, HandlerEvent, HandlerIn},
56 mcache::MessageCache,
57 metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty},
58 peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason},
59 protocol::SIGNING_PREFIX,
60 rpc::Sender,
61 rpc_proto::proto,
62 subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter},
63 time_cache::DuplicateCache,
64 topic::{Hasher, Topic, TopicHash},
65 transform::{DataTransform, IdentityTransform},
66 types::{
67 ControlAction, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, MessageId,
68 PeerConnections, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription,
69 SubscriptionAction,
70 },
71 FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError,
72};
73
74#[cfg(test)]
75mod tests;
76
77const IDONTWANT_CAP: usize = 10_000;
79
80const IDONTWANT_TIMEOUT: Duration = Duration::new(3, 0);
82
83#[derive(Clone)]
90pub enum MessageAuthenticity {
91 Signed(Keypair),
94 Author(PeerId),
99 RandomAuthor,
104 Anonymous,
114}
115
116impl MessageAuthenticity {
117 pub fn is_signing(&self) -> bool {
119 matches!(self, MessageAuthenticity::Signed(_))
120 }
121
122 pub fn is_anonymous(&self) -> bool {
123 matches!(self, MessageAuthenticity::Anonymous)
124 }
125}
126
127#[derive(Debug)]
129pub enum Event {
130 Message {
132 propagation_source: PeerId,
134 message_id: MessageId,
137 message: Message,
139 },
140 Subscribed {
142 peer_id: PeerId,
144 topic: TopicHash,
146 },
147 Unsubscribed {
149 peer_id: PeerId,
151 topic: TopicHash,
153 },
154 GossipsubNotSupported { peer_id: PeerId },
156 SlowPeer {
158 peer_id: PeerId,
160 failed_messages: FailedMessages,
162 },
163}
164
165#[allow(clippy::large_enum_variant)]
168enum PublishConfig {
169 Signing {
170 keypair: Keypair,
171 author: PeerId,
172 inline_key: Option<Vec<u8>>,
173 last_seq_no: SequenceNumber,
174 },
175 Author(PeerId),
176 RandomAuthor,
177 Anonymous,
178}
179
180#[derive(Debug)]
184struct SequenceNumber(u64);
185
186impl SequenceNumber {
187 fn new() -> Self {
188 let unix_timestamp = SystemTime::now()
189 .duration_since(SystemTime::UNIX_EPOCH)
190 .expect("time to be linear")
191 .as_nanos();
192
193 Self(unix_timestamp as u64)
194 }
195
196 fn next(&mut self) -> u64 {
197 self.0 = self
198 .0
199 .checked_add(1)
200 .expect("to not exhaust u64 space for sequence numbers");
201
202 self.0
203 }
204}
205
206impl PublishConfig {
207 pub(crate) fn get_own_id(&self) -> Option<&PeerId> {
208 match self {
209 Self::Signing { author, .. } => Some(author),
210 Self::Author(author) => Some(author),
211 _ => None,
212 }
213 }
214}
215
216impl From<MessageAuthenticity> for PublishConfig {
217 fn from(authenticity: MessageAuthenticity) -> Self {
218 match authenticity {
219 MessageAuthenticity::Signed(keypair) => {
220 let public_key = keypair.public();
221 let key_enc = public_key.encode_protobuf();
222 let key = if key_enc.len() <= 42 {
223 None
227 } else {
228 Some(key_enc)
230 };
231
232 PublishConfig::Signing {
233 keypair,
234 author: public_key.to_peer_id(),
235 inline_key: key,
236 last_seq_no: SequenceNumber::new(),
237 }
238 }
239 MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
240 MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
241 MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
242 }
243 }
244}
245
246pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
258 config: Config,
260
261 events: VecDeque<ToSwarm<Event, HandlerIn>>,
263
264 publish_config: PublishConfig,
266
267 duplicate_cache: DuplicateCache<MessageId>,
270
271 connected_peers: HashMap<PeerId, PeerConnections>,
274
275 explicit_peers: HashSet<PeerId>,
278
279 blacklisted_peers: HashSet<PeerId>,
282
283 mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
285
286 fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
288
289 fanout_last_pub: HashMap<TopicHash, Instant>,
291
292 backoffs: BackoffStorage,
294
295 mcache: MessageCache,
297
298 heartbeat: Delay,
300
301 heartbeat_ticks: u64,
304
305 px_peers: HashSet<PeerId>,
310
311 outbound_peers: HashSet<PeerId>,
314
315 peer_score: Option<(PeerScore, PeerScoreThresholds, Delay)>,
318
319 count_received_ihave: HashMap<PeerId, usize>,
321
322 count_sent_iwant: HashMap<PeerId, usize>,
324
325 published_message_ids: DuplicateCache<MessageId>,
328
329 subscription_filter: F,
331
332 data_transform: D,
336
337 metrics: Option<Metrics>,
339
340 failed_messages: HashMap<PeerId, FailedMessages>,
342
343 gossip_promises: GossipPromises,
345}
346
347impl<D, F> Behaviour<D, F>
348where
349 D: DataTransform + Default,
350 F: TopicSubscriptionFilter + Default,
351{
352 pub fn new(privacy: MessageAuthenticity, config: Config) -> Result<Self, &'static str> {
355 Self::new_with_subscription_filter_and_transform(
356 privacy,
357 config,
358 None,
359 F::default(),
360 D::default(),
361 )
362 }
363
364 pub fn new_with_metrics(
368 privacy: MessageAuthenticity,
369 config: Config,
370 metrics_registry: &mut Registry,
371 metrics_config: MetricsConfig,
372 ) -> Result<Self, &'static str> {
373 Self::new_with_subscription_filter_and_transform(
374 privacy,
375 config,
376 Some((metrics_registry, metrics_config)),
377 F::default(),
378 D::default(),
379 )
380 }
381}
382
383impl<D, F> Behaviour<D, F>
384where
385 D: DataTransform + Default,
386 F: TopicSubscriptionFilter,
387{
388 pub fn new_with_subscription_filter(
391 privacy: MessageAuthenticity,
392 config: Config,
393 metrics: Option<(&mut Registry, MetricsConfig)>,
394 subscription_filter: F,
395 ) -> Result<Self, &'static str> {
396 Self::new_with_subscription_filter_and_transform(
397 privacy,
398 config,
399 metrics,
400 subscription_filter,
401 D::default(),
402 )
403 }
404}
405
406impl<D, F> Behaviour<D, F>
407where
408 D: DataTransform,
409 F: TopicSubscriptionFilter + Default,
410{
411 pub fn new_with_transform(
414 privacy: MessageAuthenticity,
415 config: Config,
416 metrics: Option<(&mut Registry, MetricsConfig)>,
417 data_transform: D,
418 ) -> Result<Self, &'static str> {
419 Self::new_with_subscription_filter_and_transform(
420 privacy,
421 config,
422 metrics,
423 F::default(),
424 data_transform,
425 )
426 }
427}
428
429impl<D, F> Behaviour<D, F>
430where
431 D: DataTransform,
432 F: TopicSubscriptionFilter,
433{
434 pub fn new_with_subscription_filter_and_transform(
437 privacy: MessageAuthenticity,
438 config: Config,
439 metrics: Option<(&mut Registry, MetricsConfig)>,
440 subscription_filter: F,
441 data_transform: D,
442 ) -> Result<Self, &'static str> {
443 validate_config(&privacy, config.validation_mode())?;
448
449 Ok(Behaviour {
450 metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)),
451 events: VecDeque::new(),
452 publish_config: privacy.into(),
453 duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
454 explicit_peers: HashSet::new(),
455 blacklisted_peers: HashSet::new(),
456 mesh: HashMap::new(),
457 fanout: HashMap::new(),
458 fanout_last_pub: HashMap::new(),
459 backoffs: BackoffStorage::new(
460 &config.prune_backoff(),
461 config.heartbeat_interval(),
462 config.backoff_slack(),
463 ),
464 mcache: MessageCache::new(config.history_gossip(), config.history_length()),
465 heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
466 heartbeat_ticks: 0,
467 px_peers: HashSet::new(),
468 outbound_peers: HashSet::new(),
469 peer_score: None,
470 count_received_ihave: HashMap::new(),
471 count_sent_iwant: HashMap::new(),
472 connected_peers: HashMap::new(),
473 published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
474 config,
475 subscription_filter,
476 data_transform,
477 failed_messages: Default::default(),
478 gossip_promises: Default::default(),
479 })
480 }
481}
482
483impl<D, F> Behaviour<D, F>
484where
485 D: DataTransform + Send + 'static,
486 F: TopicSubscriptionFilter + Send + 'static,
487{
488 pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
490 self.mesh.keys()
491 }
492
493 pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
495 self.mesh.get(topic_hash).into_iter().flat_map(|x| x.iter())
496 }
497
498 pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
499 let mut res = BTreeSet::new();
500 for peers in self.mesh.values() {
501 res.extend(peers);
502 }
503 res.into_iter()
504 }
505
506 pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
508 self.connected_peers
509 .iter()
510 .map(|(peer_id, peer)| (peer_id, peer.topics.iter().collect()))
511 }
512
513 pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
515 self.connected_peers.iter().map(|(k, v)| (k, &v.kind))
516 }
517
518 pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
520 self.peer_score
521 .as_ref()
522 .map(|(score, ..)| score.score(peer_id))
523 }
524
525 pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
530 tracing::debug!(%topic, "Subscribing to topic");
531 let topic_hash = topic.hash();
532 if !self.subscription_filter.can_subscribe(&topic_hash) {
533 return Err(SubscriptionError::NotAllowed);
534 }
535
536 if self.mesh.contains_key(&topic_hash) {
537 tracing::debug!(%topic, "Topic is already in the mesh");
538 return Ok(false);
539 }
540
541 for peer_id in self.connected_peers.keys().copied().collect::<Vec<_>>() {
543 tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer");
544 let event = RpcOut::Subscribe(topic_hash.clone());
545 self.send_message(peer_id, event);
546 }
547
548 self.join(&topic_hash);
551 tracing::debug!(%topic, "Subscribed to topic");
552 Ok(true)
553 }
554
555 pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> bool {
559 tracing::debug!(%topic, "Unsubscribing from topic");
560 let topic_hash = topic.hash();
561
562 if !self.mesh.contains_key(&topic_hash) {
563 tracing::debug!(topic=%topic_hash, "Already unsubscribed from topic");
564 return false;
566 }
567
568 for peer in self.connected_peers.keys().copied().collect::<Vec<_>>() {
570 tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
571 let event = RpcOut::Unsubscribe(topic_hash.clone());
572 self.send_message(peer, event);
573 }
574
575 self.leave(&topic_hash);
578
579 tracing::debug!(topic=%topic_hash, "Unsubscribed from topic");
580 true
581 }
582
583 pub fn publish(
585 &mut self,
586 topic: impl Into<TopicHash>,
587 data: impl Into<Vec<u8>>,
588 ) -> Result<MessageId, PublishError> {
589 let data = data.into();
590 let topic = topic.into();
591
592 let transformed_data = self
594 .data_transform
595 .outbound_transform(&topic, data.clone())?;
596
597 if transformed_data.len() > self.config.max_transmit_size() {
599 return Err(PublishError::MessageTooLarge);
600 }
601
602 let raw_message = self.build_raw_message(topic, transformed_data)?;
603
604 let msg_id = self.config.message_id(&Message {
606 source: raw_message.source,
607 data, sequence_number: raw_message.sequence_number,
609 topic: raw_message.topic.clone(),
610 });
611
612 if self.duplicate_cache.contains(&msg_id) {
614 tracing::warn!(
617 message=%msg_id,
618 "Not publishing a message that has already been published"
619 );
620 return Err(PublishError::Duplicate);
621 }
622
623 tracing::trace!(message=%msg_id, "Publishing message");
624
625 let topic_hash = raw_message.topic.clone();
626
627 let mut peers_on_topic = self
628 .connected_peers
629 .iter()
630 .filter(|(_, p)| p.topics.contains(&topic_hash))
631 .map(|(peer_id, _)| peer_id)
632 .peekable();
633
634 if peers_on_topic.peek().is_none() {
635 return Err(PublishError::InsufficientPeers);
636 }
637
638 let mut recipient_peers = HashSet::new();
639 if self.config.flood_publish() {
640 recipient_peers.extend(peers_on_topic.filter(|p| {
642 self.explicit_peers.contains(*p)
643 || !self.score_below_threshold(p, |ts| ts.publish_threshold).0
644 }));
645 } else {
646 match self.mesh.get(&topic_hash) {
647 Some(mesh_peers) => {
649 let needed_extra_peers = self.config.mesh_n().saturating_sub(mesh_peers.len());
652
653 if needed_extra_peers > 0 {
654 let peer_list = get_random_peers(
659 &self.connected_peers,
660 &topic_hash,
661 needed_extra_peers,
662 |peer| {
663 !mesh_peers.contains(peer)
664 && !self.explicit_peers.contains(peer)
665 && !self
666 .score_below_threshold(peer, |pst| pst.publish_threshold)
667 .0
668 },
669 );
670 recipient_peers.extend(peer_list);
671 }
672
673 recipient_peers.extend(mesh_peers);
674 }
675 None => {
677 tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
678 let fanout_peers = self
680 .fanout
681 .get(&topic_hash)
682 .filter(|peers| !peers.is_empty());
683 if let Some(peers) = fanout_peers {
685 for peer in peers {
686 recipient_peers.insert(*peer);
687 }
688 } else {
689 let mesh_n = self.config.mesh_n();
691 let new_peers =
692 get_random_peers(&self.connected_peers, &topic_hash, mesh_n, {
693 |p| {
694 !self.explicit_peers.contains(p)
695 && !self
696 .score_below_threshold(p, |pst| pst.publish_threshold)
697 .0
698 }
699 });
700 self.fanout.insert(topic_hash.clone(), new_peers.clone());
702 for peer in new_peers {
703 tracing::debug!(%peer, "Peer added to fanout");
704 recipient_peers.insert(peer);
705 }
706 }
707 self.fanout_last_pub
709 .insert(topic_hash.clone(), Instant::now());
710 }
711 }
712
713 recipient_peers
715 .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id)));
716
717 for (peer, connections) in &self.connected_peers {
719 if connections.kind == PeerKind::Floodsub
720 && !self
721 .score_below_threshold(peer, |ts| ts.publish_threshold)
722 .0
723 {
724 recipient_peers.insert(*peer);
725 }
726 }
727 }
728
729 self.duplicate_cache.insert(msg_id.clone());
732 self.mcache.put(&msg_id, raw_message.clone());
733
734 if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
737 if !self.config.allow_self_origin() {
738 self.published_message_ids.insert(msg_id.clone());
739 }
740 }
741
742 let mut publish_failed = true;
744 for peer_id in recipient_peers.iter() {
745 tracing::trace!(peer=%peer_id, "Sending message to peer");
746 if self.send_message(
747 *peer_id,
748 RpcOut::Publish {
749 message: raw_message.clone(),
750 timeout: Delay::new(self.config.publish_queue_duration()),
751 },
752 ) {
753 publish_failed = false
754 }
755 }
756
757 if recipient_peers.is_empty() {
758 return Err(PublishError::InsufficientPeers);
759 }
760
761 if publish_failed {
762 return Err(PublishError::AllQueuesFull(recipient_peers.len()));
763 }
764
765 if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold()
767 && self.config.idontwant_on_publish()
768 {
769 self.send_idontwant(&raw_message, &msg_id, raw_message.source.as_ref());
770 }
771
772 tracing::debug!(message=%msg_id, "Published message");
773
774 if let Some(metrics) = self.metrics.as_mut() {
775 metrics.register_published_message(&topic_hash);
776 }
777
778 Ok(msg_id)
779 }
780
781 pub fn report_message_validation_result(
801 &mut self,
802 msg_id: &MessageId,
803 propagation_source: &PeerId,
804 acceptance: MessageAcceptance,
805 ) -> bool {
806 let reject_reason = match acceptance {
807 MessageAcceptance::Accept => {
808 let (raw_message, originating_peers) = match self.mcache.validate(msg_id) {
809 Some((raw_message, originating_peers)) => {
810 (raw_message.clone(), originating_peers)
811 }
812 None => {
813 tracing::warn!(
814 message=%msg_id,
815 "Message not in cache. Ignoring forwarding"
816 );
817 if let Some(metrics) = self.metrics.as_mut() {
818 metrics.memcache_miss();
819 }
820 return false;
821 }
822 };
823
824 if let Some(metrics) = self.metrics.as_mut() {
825 metrics.register_msg_validation(&raw_message.topic, &acceptance);
826 }
827
828 self.forward_msg(
829 msg_id,
830 raw_message,
831 Some(propagation_source),
832 originating_peers,
833 );
834 return true;
835 }
836 MessageAcceptance::Reject => RejectReason::ValidationFailed,
837 MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
838 };
839
840 if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) {
841 if let Some(metrics) = self.metrics.as_mut() {
842 metrics.register_msg_validation(&raw_message.topic, &acceptance);
843 }
844
845 if let Some((peer_score, ..)) = &mut self.peer_score {
848 peer_score.reject_message(
849 propagation_source,
850 msg_id,
851 &raw_message.topic,
852 reject_reason,
853 );
854 for peer in originating_peers.iter() {
855 peer_score.reject_message(peer, msg_id, &raw_message.topic, reject_reason);
856 }
857 }
858 true
859 } else {
860 tracing::warn!(message=%msg_id, "Rejected message not in cache");
861 false
862 }
863 }
864
865 pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
867 tracing::debug!(peer=%peer_id, "Adding explicit peer");
868
869 self.explicit_peers.insert(*peer_id);
870
871 self.check_explicit_peer_connection(peer_id);
872 }
873
874 pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
877 tracing::debug!(peer=%peer_id, "Removing explicit peer");
878 self.explicit_peers.remove(peer_id);
879 }
880
881 pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
884 if self.blacklisted_peers.insert(*peer_id) {
885 tracing::debug!(peer=%peer_id, "Peer has been blacklisted");
886 }
887 }
888
889 pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
891 if self.blacklisted_peers.remove(peer_id) {
892 tracing::debug!(peer=%peer_id, "Peer has been removed from the blacklist");
893 }
894 }
895
896 pub fn with_peer_score(
900 &mut self,
901 params: PeerScoreParams,
902 threshold: PeerScoreThresholds,
903 ) -> Result<(), String> {
904 self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
905 }
906
907 pub fn with_peer_score_and_message_delivery_time_callback(
910 &mut self,
911 params: PeerScoreParams,
912 threshold: PeerScoreThresholds,
913 callback: Option<fn(&PeerId, &TopicHash, f64)>,
914 ) -> Result<(), String> {
915 params.validate()?;
916 threshold.validate()?;
917
918 if self.peer_score.is_some() {
919 return Err("Peer score set twice".into());
920 }
921
922 let interval = Delay::new(params.decay_interval);
923 let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
924 self.peer_score = Some((peer_score, threshold, interval));
925 Ok(())
926 }
927
928 pub fn set_topic_params<H: Hasher>(
932 &mut self,
933 topic: Topic<H>,
934 params: TopicScoreParams,
935 ) -> Result<(), &'static str> {
936 if let Some((peer_score, ..)) = &mut self.peer_score {
937 peer_score.set_topic_params(topic.hash(), params);
938 Ok(())
939 } else {
940 Err("Peer score must be initialised with `with_peer_score()`")
941 }
942 }
943
944 pub fn get_topic_params<H: Hasher>(&self, topic: &Topic<H>) -> Option<&TopicScoreParams> {
946 self.peer_score.as_ref()?.0.get_topic_params(&topic.hash())
947 }
948
949 pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
952 if let Some((peer_score, ..)) = &mut self.peer_score {
953 peer_score.set_application_score(peer_id, new_score)
954 } else {
955 false
956 }
957 }
958
959 fn join(&mut self, topic_hash: &TopicHash) {
961 tracing::debug!(topic=%topic_hash, "Running JOIN for topic");
962
963 if self.mesh.contains_key(topic_hash) {
965 tracing::debug!(topic=%topic_hash, "JOIN: The topic is already in the mesh, ignoring JOIN");
966 return;
967 }
968
969 let mut added_peers = HashSet::new();
970
971 if let Some(m) = self.metrics.as_mut() {
972 m.joined(topic_hash)
973 }
974
975 if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
978 tracing::debug!(
979 topic=%topic_hash,
980 "JOIN: Removing peers from the fanout for topic"
981 );
982
983 peers.retain(|p| {
985 !self.explicit_peers.contains(p)
986 && !self.score_below_threshold(p, |_| 0.0).0
987 && !self.backoffs.is_backoff_with_slack(topic_hash, p)
988 });
989
990 let add_peers = std::cmp::min(peers.len(), self.config.mesh_n());
993 tracing::debug!(
994 topic=%topic_hash,
995 "JOIN: Adding {:?} peers from the fanout for topic",
996 add_peers
997 );
998 added_peers.extend(peers.iter().take(add_peers));
999
1000 self.mesh.insert(
1001 topic_hash.clone(),
1002 peers.into_iter().take(add_peers).collect(),
1003 );
1004
1005 self.fanout_last_pub.remove(topic_hash);
1007 }
1008
1009 let fanaout_added = added_peers.len();
1010 if let Some(m) = self.metrics.as_mut() {
1011 m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added)
1012 }
1013
1014 if added_peers.len() < self.config.mesh_n() {
1016 let new_peers = get_random_peers(
1018 &self.connected_peers,
1019 topic_hash,
1020 self.config.mesh_n() - added_peers.len(),
1021 |peer| {
1022 !added_peers.contains(peer)
1023 && !self.explicit_peers.contains(peer)
1024 && !self.score_below_threshold(peer, |_| 0.0).0
1025 && !self.backoffs.is_backoff_with_slack(topic_hash, peer)
1026 },
1027 );
1028 added_peers.extend(new_peers.clone());
1029 tracing::debug!(
1031 "JOIN: Inserting {:?} random peers into the mesh",
1032 new_peers.len()
1033 );
1034 let mesh_peers = self.mesh.entry(topic_hash.clone()).or_default();
1035 mesh_peers.extend(new_peers);
1036 }
1037
1038 let random_added = added_peers.len() - fanaout_added;
1039 if let Some(m) = self.metrics.as_mut() {
1040 m.peers_included(topic_hash, Inclusion::Random, random_added)
1041 }
1042
1043 for peer_id in added_peers {
1044 tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer");
1046 if let Some((peer_score, ..)) = &mut self.peer_score {
1047 peer_score.graft(&peer_id, topic_hash.clone());
1048 }
1049 self.send_message(
1050 peer_id,
1051 RpcOut::Graft(Graft {
1052 topic_hash: topic_hash.clone(),
1053 }),
1054 );
1055
1056 peer_added_to_mesh(
1058 peer_id,
1059 vec![topic_hash],
1060 &self.mesh,
1061 &mut self.events,
1062 &self.connected_peers,
1063 );
1064 }
1065
1066 let mesh_peers = self.mesh_peers(topic_hash).count();
1067 if let Some(m) = self.metrics.as_mut() {
1068 m.set_mesh_peers(topic_hash, mesh_peers)
1069 }
1070
1071 tracing::debug!(topic=%topic_hash, "Completed JOIN for topic");
1072 }
1073
1074 fn make_prune(
1076 &mut self,
1077 topic_hash: &TopicHash,
1078 peer: &PeerId,
1079 do_px: bool,
1080 on_unsubscribe: bool,
1081 ) -> Prune {
1082 if let Some((peer_score, ..)) = &mut self.peer_score {
1083 peer_score.prune(peer, topic_hash.clone());
1084 }
1085
1086 match self.connected_peers.get(peer).map(|v| &v.kind) {
1087 Some(PeerKind::Floodsub) => {
1088 tracing::error!("Attempted to prune a Floodsub peer");
1089 }
1090 Some(PeerKind::Gossipsub) => {
1091 return Prune {
1093 topic_hash: topic_hash.clone(),
1094 peers: Vec::new(),
1095 backoff: None,
1096 };
1097 }
1098 None => {
1099 tracing::error!("Attempted to Prune an unknown peer");
1100 }
1101 _ => {} }
1103
1104 let peers = if do_px {
1106 get_random_peers(
1107 &self.connected_peers,
1108 topic_hash,
1109 self.config.prune_peers(),
1110 |p| p != peer && !self.score_below_threshold(p, |_| 0.0).0,
1111 )
1112 .into_iter()
1113 .map(|p| PeerInfo { peer_id: Some(p) })
1114 .collect()
1115 } else {
1116 Vec::new()
1117 };
1118
1119 let backoff = if on_unsubscribe {
1120 self.config.unsubscribe_backoff()
1121 } else {
1122 self.config.prune_backoff()
1123 };
1124
1125 self.backoffs.update_backoff(topic_hash, peer, backoff);
1127
1128 Prune {
1129 topic_hash: topic_hash.clone(),
1130 peers,
1131 backoff: Some(backoff.as_secs()),
1132 }
1133 }
1134
1135 fn leave(&mut self, topic_hash: &TopicHash) {
1137 tracing::debug!(topic=%topic_hash, "Running LEAVE for topic");
1138
1139 if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
1141 if let Some(m) = self.metrics.as_mut() {
1142 m.left(topic_hash)
1143 }
1144 for peer_id in peers {
1145 tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer");
1147
1148 let on_unsubscribe = true;
1149 let prune =
1150 self.make_prune(topic_hash, &peer_id, self.config.do_px(), on_unsubscribe);
1151 self.send_message(peer_id, RpcOut::Prune(prune));
1152
1153 peer_removed_from_mesh(
1155 peer_id,
1156 topic_hash,
1157 &self.mesh,
1158 &mut self.events,
1159 &self.connected_peers,
1160 );
1161 }
1162 }
1163 tracing::debug!(topic=%topic_hash, "Completed LEAVE for topic");
1164 }
1165
1166 fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
1168 if !self.connected_peers.contains_key(peer_id) {
1169 tracing::debug!(peer=%peer_id, "Connecting to explicit peer");
1171 self.events.push_back(ToSwarm::Dial {
1172 opts: DialOpts::peer_id(*peer_id).build(),
1173 });
1174 }
1175 }
1176
1177 fn score_below_threshold(
1180 &self,
1181 peer_id: &PeerId,
1182 threshold: impl Fn(&PeerScoreThresholds) -> f64,
1183 ) -> (bool, f64) {
1184 Self::score_below_threshold_from_scores(&self.peer_score, peer_id, threshold)
1185 }
1186
1187 fn score_below_threshold_from_scores(
1188 peer_score: &Option<(PeerScore, PeerScoreThresholds, Delay)>,
1189 peer_id: &PeerId,
1190 threshold: impl Fn(&PeerScoreThresholds) -> f64,
1191 ) -> (bool, f64) {
1192 if let Some((peer_score, thresholds, ..)) = peer_score {
1193 let score = peer_score.score(peer_id);
1194 if score < threshold(thresholds) {
1195 return (true, score);
1196 }
1197 (false, score)
1198 } else {
1199 (false, 0.0)
1200 }
1201 }
1202
1203 fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
1206 if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1208 tracing::debug!(
1209 peer=%peer_id,
1210 %score,
1211 "IHAVE: ignoring peer with score below threshold"
1212 );
1213 return;
1214 }
1215
1216 let peer_have = self.count_received_ihave.entry(*peer_id).or_insert(0);
1218 *peer_have += 1;
1219 if *peer_have > self.config.max_ihave_messages() {
1220 tracing::debug!(
1221 peer=%peer_id,
1222 "IHAVE: peer has advertised too many times ({}) within this heartbeat \
1223 interval; ignoring",
1224 *peer_have
1225 );
1226 return;
1227 }
1228
1229 if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
1230 if *iasked >= self.config.max_ihave_length() {
1231 tracing::debug!(
1232 peer=%peer_id,
1233 "IHAVE: peer has already advertised too many messages ({}); ignoring",
1234 *iasked
1235 );
1236 return;
1237 }
1238 }
1239
1240 tracing::trace!(peer=%peer_id, "Handling IHAVE for peer");
1241
1242 let mut iwant_ids = HashSet::new();
1243
1244 let want_message = |id: &MessageId| {
1245 if self.duplicate_cache.contains(id) {
1246 return false;
1247 }
1248
1249 !self.gossip_promises.contains(id)
1250 };
1251
1252 for (topic, ids) in ihave_msgs {
1253 if !self.mesh.contains_key(&topic) {
1255 tracing::debug!(
1256 %topic,
1257 "IHAVE: Ignoring IHAVE - Not subscribed to topic"
1258 );
1259 continue;
1260 }
1261
1262 for id in ids.into_iter().filter(want_message) {
1263 if iwant_ids.insert(id) {
1265 if let Some(metrics) = self.metrics.as_mut() {
1267 metrics.register_iwant(&topic);
1268 }
1269 }
1270 }
1271 }
1272
1273 if !iwant_ids.is_empty() {
1274 let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
1275 let mut iask = iwant_ids.len();
1276 if *iasked + iask > self.config.max_ihave_length() {
1277 iask = self.config.max_ihave_length().saturating_sub(*iasked);
1278 }
1279
1280 tracing::debug!(
1282 peer=%peer_id,
1283 "IHAVE: Asking for {} out of {} messages from peer",
1284 iask,
1285 iwant_ids.len()
1286 );
1287
1288 let mut iwant_ids_vec: Vec<_> = iwant_ids.into_iter().collect();
1290 let mut rng = thread_rng();
1291 iwant_ids_vec.partial_shuffle(&mut rng, iask);
1292
1293 iwant_ids_vec.truncate(iask);
1294 *iasked += iask;
1295
1296 self.gossip_promises.add_promise(
1297 *peer_id,
1298 &iwant_ids_vec,
1299 Instant::now() + self.config.iwant_followup_time(),
1300 );
1301 tracing::trace!(
1302 peer=%peer_id,
1303 "IHAVE: Asking for the following messages from peer: {:?}",
1304 iwant_ids_vec
1305 );
1306
1307 self.send_message(
1308 *peer_id,
1309 RpcOut::IWant(IWant {
1310 message_ids: iwant_ids_vec,
1311 }),
1312 );
1313 }
1314 tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
1315 }
1316
1317 fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
1320 if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1322 tracing::debug!(
1323 peer=%peer_id,
1324 "IWANT: ignoring peer with score below threshold [score = {}]",
1325 score
1326 );
1327 return;
1328 }
1329
1330 tracing::debug!(peer=%peer_id, "Handling IWANT for peer");
1331
1332 for id in iwant_msgs {
1333 if let Some((msg, count)) = self
1336 .mcache
1337 .get_with_iwant_counts(&id, peer_id)
1338 .map(|(msg, count)| (msg.clone(), count))
1339 {
1340 if count > self.config.gossip_retransimission() {
1341 tracing::debug!(
1342 peer=%peer_id,
1343 message=%id,
1344 "IWANT: Peer has asked for message too many times; ignoring request"
1345 );
1346 } else {
1347 tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
1348 self.send_message(
1349 *peer_id,
1350 RpcOut::Forward {
1351 message: msg,
1352 timeout: Delay::new(self.config.forward_queue_duration()),
1353 },
1354 );
1355 }
1356 }
1357 }
1358 tracing::debug!(peer=%peer_id, "Completed IWANT handling for peer");
1359 }
1360
1361 fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
1364 tracing::debug!(peer=%peer_id, "Handling GRAFT message for peer");
1365
1366 let mut to_prune_topics = HashSet::new();
1367
1368 let mut do_px = self.config.do_px();
1369
1370 let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else {
1371 tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft");
1372 return;
1373 };
1374
1375 for topic in &topics {
1378 if connected_peer.topics.insert(topic.clone()) {
1379 if let Some(m) = self.metrics.as_mut() {
1380 m.inc_topic_peers(topic);
1381 }
1382 }
1383 }
1384
1385 if self.explicit_peers.contains(peer_id) {
1387 tracing::warn!(peer=%peer_id, "GRAFT: ignoring request from direct peer");
1388 to_prune_topics = topics.into_iter().collect();
1390 do_px = false
1392 } else {
1393 let (below_zero, score) = self.score_below_threshold(peer_id, |_| 0.0);
1394 let now = Instant::now();
1395 for topic_hash in topics {
1396 if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1397 if peers.contains(peer_id) {
1399 tracing::debug!(
1400 peer=%peer_id,
1401 topic=%&topic_hash,
1402 "GRAFT: Received graft for peer that is already in topic"
1403 );
1404 continue;
1405 }
1406
1407 if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
1409 {
1410 if backoff_time > now {
1411 tracing::warn!(
1412 peer=%peer_id,
1413 "[Penalty] Peer attempted graft within backoff time, penalizing"
1414 );
1415 if let Some((peer_score, ..)) = &mut self.peer_score {
1417 if let Some(metrics) = self.metrics.as_mut() {
1418 metrics.register_score_penalty(Penalty::GraftBackoff);
1419 }
1420 peer_score.add_penalty(peer_id, 1);
1421
1422 #[allow(unknown_lints, clippy::unchecked_duration_subtraction)]
1425 let flood_cutoff = (backoff_time
1426 + self.config.graft_flood_threshold())
1427 - self.config.prune_backoff();
1428 if flood_cutoff > now {
1429 peer_score.add_penalty(peer_id, 1);
1431 }
1432 }
1433 do_px = false;
1435
1436 to_prune_topics.insert(topic_hash.clone());
1437 continue;
1438 }
1439 }
1440
1441 if below_zero {
1443 tracing::debug!(
1445 peer=%peer_id,
1446 %score,
1447 topic=%topic_hash,
1448 "GRAFT: ignoring peer with negative score"
1449 );
1450 to_prune_topics.insert(topic_hash.clone());
1453 do_px = false;
1455 continue;
1456 }
1457
1458 if peers.len() >= self.config.mesh_n_high()
1461 && !self.outbound_peers.contains(peer_id)
1462 {
1463 to_prune_topics.insert(topic_hash.clone());
1464 continue;
1465 }
1466
1467 tracing::debug!(
1469 peer=%peer_id,
1470 topic=%topic_hash,
1471 "GRAFT: Mesh link added for peer in topic"
1472 );
1473
1474 if peers.insert(*peer_id) {
1475 if let Some(m) = self.metrics.as_mut() {
1476 m.peers_included(&topic_hash, Inclusion::Subscribed, 1)
1477 }
1478 }
1479
1480 peer_added_to_mesh(
1482 *peer_id,
1483 vec![&topic_hash],
1484 &self.mesh,
1485 &mut self.events,
1486 &self.connected_peers,
1487 );
1488
1489 if let Some((peer_score, ..)) = &mut self.peer_score {
1490 peer_score.graft(peer_id, topic_hash);
1491 }
1492 } else {
1493 do_px = false;
1495 tracing::debug!(
1496 peer=%peer_id,
1497 topic=%topic_hash,
1498 "GRAFT: Received graft for unknown topic from peer"
1499 );
1500 continue;
1502 }
1503 }
1504 }
1505
1506 if !to_prune_topics.is_empty() {
1507 let on_unsubscribe = false;
1509
1510 for prune in to_prune_topics
1511 .iter()
1512 .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
1513 .collect::<Vec<_>>()
1514 {
1515 self.send_message(*peer_id, RpcOut::Prune(prune));
1516 }
1517 tracing::debug!(
1519 peer=%peer_id,
1520 "GRAFT: Not subscribed to topics - Sending PRUNE to peer"
1521 );
1522 }
1523 tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer");
1524 }
1525
1526 fn remove_peer_from_mesh(
1527 &mut self,
1528 peer_id: &PeerId,
1529 topic_hash: &TopicHash,
1530 backoff: Option<u64>,
1531 always_update_backoff: bool,
1532 reason: Churn,
1533 ) {
1534 let mut update_backoff = always_update_backoff;
1535 if let Some(peers) = self.mesh.get_mut(topic_hash) {
1536 if peers.remove(peer_id) {
1538 tracing::debug!(
1539 peer=%peer_id,
1540 topic=%topic_hash,
1541 "PRUNE: Removing peer from the mesh for topic"
1542 );
1543 if let Some(m) = self.metrics.as_mut() {
1544 m.peers_removed(topic_hash, reason, 1)
1545 }
1546
1547 if let Some((peer_score, ..)) = &mut self.peer_score {
1548 peer_score.prune(peer_id, topic_hash.clone());
1549 }
1550
1551 update_backoff = true;
1552
1553 peer_removed_from_mesh(
1555 *peer_id,
1556 topic_hash,
1557 &self.mesh,
1558 &mut self.events,
1559 &self.connected_peers,
1560 );
1561 }
1562 }
1563 if update_backoff {
1564 let time = if let Some(backoff) = backoff {
1565 Duration::from_secs(backoff)
1566 } else {
1567 self.config.prune_backoff()
1568 };
1569 self.backoffs.update_backoff(topic_hash, peer_id, time);
1571 }
1572 }
1573
1574 fn handle_prune(
1576 &mut self,
1577 peer_id: &PeerId,
1578 prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
1579 ) {
1580 tracing::debug!(peer=%peer_id, "Handling PRUNE message for peer");
1581 let (below_threshold, score) =
1582 self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold);
1583 for (topic_hash, px, backoff) in prune_data {
1584 self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true, Churn::Prune);
1585
1586 if self.mesh.contains_key(&topic_hash) {
1587 if !px.is_empty() {
1589 if below_threshold {
1591 tracing::debug!(
1592 peer=%peer_id,
1593 %score,
1594 topic=%topic_hash,
1595 "PRUNE: ignoring PX from peer with insufficient score"
1596 );
1597 continue;
1598 }
1599
1600 if self.config.prune_peers() > 0 {
1607 self.px_connect(px);
1608 }
1609 }
1610 }
1611 }
1612 tracing::debug!(peer=%peer_id, "Completed PRUNE handling for peer");
1613 }
1614
1615 fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
1616 let n = self.config.prune_peers();
1617 px.retain(|p| p.peer_id.is_some());
1622 if px.len() > n {
1623 let mut rng = thread_rng();
1625 px.partial_shuffle(&mut rng, n);
1626 px = px.into_iter().take(n).collect();
1627 }
1628
1629 for p in px {
1630 if let Some(peer_id) = p.peer_id {
1633 self.px_peers.insert(peer_id);
1635
1636 self.events.push_back(ToSwarm::Dial {
1638 opts: DialOpts::peer_id(peer_id).build(),
1639 });
1640 }
1641 }
1642 }
1643
1644 fn message_is_valid(
1647 &mut self,
1648 msg_id: &MessageId,
1649 raw_message: &mut RawMessage,
1650 propagation_source: &PeerId,
1651 ) -> bool {
1652 tracing::debug!(
1653 peer=%propagation_source,
1654 message=%msg_id,
1655 "Handling message from peer"
1656 );
1657
1658 if self.blacklisted_peers.contains(propagation_source) {
1660 tracing::debug!(
1661 peer=%propagation_source,
1662 "Rejecting message from blacklisted peer"
1663 );
1664 self.gossip_promises
1665 .reject_message(msg_id, &RejectReason::BlackListedPeer);
1666 if let Some((peer_score, ..)) = &mut self.peer_score {
1667 peer_score.reject_message(
1668 propagation_source,
1669 msg_id,
1670 &raw_message.topic,
1671 RejectReason::BlackListedPeer,
1672 );
1673 }
1674 return false;
1675 }
1676
1677 if let Some(source) = raw_message.source.as_ref() {
1679 if self.blacklisted_peers.contains(source) {
1680 tracing::debug!(
1681 peer=%propagation_source,
1682 %source,
1683 "Rejecting message from peer because of blacklisted source"
1684 );
1685 self.handle_invalid_message(
1686 propagation_source,
1687 raw_message,
1688 RejectReason::BlackListedSource,
1689 );
1690 return false;
1691 }
1692 }
1693
1694 if !self.config.validate_messages() {
1698 raw_message.validated = true;
1699 }
1700
1701 let self_published = !self.config.allow_self_origin()
1703 && if let Some(own_id) = self.publish_config.get_own_id() {
1704 own_id != propagation_source
1705 && raw_message.source.as_ref().is_some_and(|s| s == own_id)
1706 } else {
1707 self.published_message_ids.contains(msg_id)
1708 };
1709
1710 if self_published {
1711 tracing::debug!(
1712 message=%msg_id,
1713 source=%propagation_source,
1714 "Dropping message claiming to be from self but forwarded from source"
1715 );
1716 self.handle_invalid_message(propagation_source, raw_message, RejectReason::SelfOrigin);
1717 return false;
1718 }
1719
1720 true
1721 }
1722
1723 fn handle_received_message(
1727 &mut self,
1728 mut raw_message: RawMessage,
1729 propagation_source: &PeerId,
1730 ) {
1731 if let Some(metrics) = self.metrics.as_mut() {
1733 metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len());
1734 }
1735
1736 let message = match self.data_transform.inbound_transform(raw_message.clone()) {
1738 Ok(message) => message,
1739 Err(e) => {
1740 tracing::debug!("Invalid message. Transform error: {:?}", e);
1741 self.handle_invalid_message(
1743 propagation_source,
1744 &raw_message,
1745 RejectReason::ValidationError(ValidationError::TransformFailed),
1746 );
1747 return;
1748 }
1749 };
1750
1751 let msg_id = self.config.message_id(&message);
1753
1754 if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() {
1756 self.send_idontwant(&raw_message, &msg_id, Some(propagation_source));
1757 }
1758
1759 if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1763 return;
1764 }
1765
1766 if !self.duplicate_cache.insert(msg_id.clone()) {
1767 tracing::debug!(message=%msg_id, "Message already received, ignoring");
1768 if let Some((peer_score, ..)) = &mut self.peer_score {
1769 peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
1770 }
1771 self.mcache.observe_duplicate(&msg_id, propagation_source);
1772 return;
1773 }
1774
1775 tracing::debug!(
1776 message=%msg_id,
1777 "Put message in duplicate_cache and resolve promises"
1778 );
1779
1780 if let Some(metrics) = self.metrics.as_mut() {
1782 metrics.msg_recvd(&message.topic);
1783 }
1784
1785 self.gossip_promises.message_delivered(&msg_id);
1788
1789 if let Some((peer_score, ..)) = &mut self.peer_score {
1791 peer_score.validate_message(propagation_source, &msg_id, &message.topic);
1792 }
1793
1794 self.mcache.put(&msg_id, raw_message.clone());
1796
1797 if self.mesh.contains_key(&message.topic) {
1799 tracing::debug!("Sending received message to user");
1800 self.events
1801 .push_back(ToSwarm::GenerateEvent(Event::Message {
1802 propagation_source: *propagation_source,
1803 message_id: msg_id.clone(),
1804 message,
1805 }));
1806 } else {
1807 tracing::debug!(
1808 topic=%message.topic,
1809 "Received message on a topic we are not subscribed to"
1810 );
1811 return;
1812 }
1813
1814 if !self.config.validate_messages() {
1816 self.forward_msg(
1817 &msg_id,
1818 raw_message,
1819 Some(propagation_source),
1820 HashSet::new(),
1821 );
1822 tracing::debug!(message=%msg_id, "Completed message handling for message");
1823 }
1824 }
1825
1826 fn handle_invalid_message(
1828 &mut self,
1829 propagation_source: &PeerId,
1830 raw_message: &RawMessage,
1831 reject_reason: RejectReason,
1832 ) {
1833 if let Some(metrics) = self.metrics.as_mut() {
1834 metrics.register_invalid_message(&raw_message.topic);
1835 }
1836
1837 let message = self.data_transform.inbound_transform(raw_message.clone());
1838
1839 match (&mut self.peer_score, message) {
1840 (Some((peer_score, ..)), Ok(message)) => {
1841 let message_id = self.config.message_id(&message);
1842
1843 peer_score.reject_message(
1844 propagation_source,
1845 &message_id,
1846 &message.topic,
1847 reject_reason,
1848 );
1849
1850 self.gossip_promises
1851 .reject_message(&message_id, &reject_reason);
1852 }
1853 (Some((peer_score, ..)), Err(_)) => {
1854 peer_score.reject_invalid_message(propagation_source, &raw_message.topic);
1858 }
1859 (None, Ok(message)) => {
1860 let message_id = self.config.message_id(&message);
1862 self.gossip_promises
1863 .reject_message(&message_id, &reject_reason);
1864 }
1865 (None, Err(_)) => {}
1866 }
1867 }
1868
1869 fn handle_received_subscriptions(
1871 &mut self,
1872 subscriptions: &[Subscription],
1873 propagation_source: &PeerId,
1874 ) {
1875 tracing::debug!(
1876 source=%propagation_source,
1877 "Handling subscriptions: {:?}",
1878 subscriptions,
1879 );
1880
1881 let mut unsubscribed_peers = Vec::new();
1882
1883 let Some(peer) = self.connected_peers.get_mut(propagation_source) else {
1884 tracing::error!(
1885 peer=%propagation_source,
1886 "Subscription by unknown peer"
1887 );
1888 return;
1889 };
1890
1891 let mut topics_to_graft = Vec::new();
1893
1894 let mut application_event = Vec::new();
1896
1897 let filtered_topics = match self
1898 .subscription_filter
1899 .filter_incoming_subscriptions(subscriptions, &peer.topics)
1900 {
1901 Ok(topics) => topics,
1902 Err(s) => {
1903 tracing::error!(
1904 peer=%propagation_source,
1905 "Subscription filter error: {}; ignoring RPC from peer",
1906 s
1907 );
1908 return;
1909 }
1910 };
1911
1912 for subscription in filtered_topics {
1913 let topic_hash = &subscription.topic_hash;
1915
1916 match subscription.action {
1917 SubscriptionAction::Subscribe => {
1918 if peer.topics.insert(topic_hash.clone()) {
1919 tracing::debug!(
1920 peer=%propagation_source,
1921 topic=%topic_hash,
1922 "SUBSCRIPTION: Adding gossip peer to topic"
1923 );
1924
1925 if let Some(m) = self.metrics.as_mut() {
1926 m.inc_topic_peers(topic_hash);
1927 }
1928 }
1929
1930 if !self.explicit_peers.contains(propagation_source)
1932 && peer.kind.is_gossipsub()
1933 && !Self::score_below_threshold_from_scores(
1934 &self.peer_score,
1935 propagation_source,
1936 |_| 0.0,
1937 )
1938 .0
1939 && !self
1940 .backoffs
1941 .is_backoff_with_slack(topic_hash, propagation_source)
1942 {
1943 if let Some(peers) = self.mesh.get_mut(topic_hash) {
1944 if peers.len() < self.config.mesh_n_low()
1945 && peers.insert(*propagation_source)
1946 {
1947 tracing::debug!(
1948 peer=%propagation_source,
1949 topic=%topic_hash,
1950 "SUBSCRIPTION: Adding peer to the mesh for topic"
1951 );
1952 if let Some(m) = self.metrics.as_mut() {
1953 m.peers_included(topic_hash, Inclusion::Subscribed, 1)
1954 }
1955 tracing::debug!(
1957 peer=%propagation_source,
1958 topic=%topic_hash,
1959 "Sending GRAFT to peer for topic"
1960 );
1961 if let Some((peer_score, ..)) = &mut self.peer_score {
1962 peer_score.graft(propagation_source, topic_hash.clone());
1963 }
1964 topics_to_graft.push(topic_hash.clone());
1965 }
1966 }
1967 }
1968 application_event.push(ToSwarm::GenerateEvent(Event::Subscribed {
1970 peer_id: *propagation_source,
1971 topic: topic_hash.clone(),
1972 }));
1973 }
1974 SubscriptionAction::Unsubscribe => {
1975 if peer.topics.remove(topic_hash) {
1976 tracing::debug!(
1977 peer=%propagation_source,
1978 topic=%topic_hash,
1979 "SUBSCRIPTION: Removing gossip peer from topic"
1980 );
1981
1982 if let Some(m) = self.metrics.as_mut() {
1983 m.dec_topic_peers(topic_hash);
1984 }
1985 }
1986
1987 unsubscribed_peers.push((*propagation_source, topic_hash.clone()));
1988 application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed {
1990 peer_id: *propagation_source,
1991 topic: topic_hash.clone(),
1992 }));
1993 }
1994 }
1995 }
1996
1997 for (peer_id, topic_hash) in unsubscribed_peers {
1999 self.fanout
2000 .get_mut(&topic_hash)
2001 .map(|peers| peers.remove(&peer_id));
2002 self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false, Churn::Unsub);
2003 }
2004
2005 let topics_joined = topics_to_graft.iter().collect::<Vec<_>>();
2007 if !topics_joined.is_empty() {
2008 peer_added_to_mesh(
2009 *propagation_source,
2010 topics_joined,
2011 &self.mesh,
2012 &mut self.events,
2013 &self.connected_peers,
2014 );
2015 }
2016
2017 for topic_hash in topics_to_graft.into_iter() {
2020 self.send_message(*propagation_source, RpcOut::Graft(Graft { topic_hash }));
2021 }
2022
2023 for event in application_event {
2025 self.events.push_back(event);
2026 }
2027
2028 tracing::trace!(
2029 source=%propagation_source,
2030 "Completed handling subscriptions from source"
2031 );
2032 }
2033
2034 fn apply_iwant_penalties(&mut self) {
2036 if let Some((peer_score, ..)) = &mut self.peer_score {
2037 for (peer, count) in self.gossip_promises.get_broken_promises() {
2038 peer_score.add_penalty(&peer, count);
2039 if let Some(metrics) = self.metrics.as_mut() {
2040 metrics.register_score_penalty(Penalty::BrokenPromise);
2041 }
2042 }
2043 }
2044 }
2045
2046 fn heartbeat(&mut self) {
2048 tracing::debug!("Starting heartbeat");
2049 let start = Instant::now();
2050
2051 if let Some(m) = &mut self.metrics {
2055 for sender_queue in self.connected_peers.values().map(|v| &v.sender) {
2056 m.observe_priority_queue_size(sender_queue.priority_queue_len());
2057 m.observe_non_priority_queue_size(sender_queue.non_priority_queue_len());
2058 }
2059 }
2060
2061 self.heartbeat_ticks += 1;
2062
2063 let mut to_graft = HashMap::new();
2064 let mut to_prune = HashMap::new();
2065 let mut no_px = HashSet::new();
2066
2067 self.backoffs.heartbeat();
2069
2070 self.count_sent_iwant.clear();
2072 self.count_received_ihave.clear();
2073
2074 self.apply_iwant_penalties();
2076
2077 if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
2079 for p in self.explicit_peers.clone() {
2080 self.check_explicit_peer_connection(&p);
2081 }
2082 }
2083
2084 let mut scores = HashMap::with_capacity(self.connected_peers.len());
2086 if let Some((peer_score, ..)) = &self.peer_score {
2087 for peer_id in self.connected_peers.keys() {
2088 scores
2089 .entry(peer_id)
2090 .or_insert_with(|| peer_score.metric_score(peer_id, self.metrics.as_mut()));
2091 }
2092 }
2093
2094 for (topic_hash, peers) in self.mesh.iter_mut() {
2096 let explicit_peers = &self.explicit_peers;
2097 let backoffs = &self.backoffs;
2098 let outbound_peers = &self.outbound_peers;
2099
2100 let mut to_remove_peers = Vec::new();
2104 for peer_id in peers.iter() {
2105 let peer_score = *scores.get(peer_id).unwrap_or(&0.0);
2106
2107 if let Some(metrics) = self.metrics.as_mut() {
2109 metrics.observe_mesh_peers_score(topic_hash, peer_score);
2110 }
2111
2112 if peer_score < 0.0 {
2113 tracing::debug!(
2114 peer=%peer_id,
2115 score=%peer_score,
2116 topic=%topic_hash,
2117 "HEARTBEAT: Prune peer with negative score"
2118 );
2119
2120 let current_topic = to_prune.entry(*peer_id).or_insert_with(Vec::new);
2121 current_topic.push(topic_hash.clone());
2122 no_px.insert(*peer_id);
2123 to_remove_peers.push(*peer_id);
2124 }
2125 }
2126
2127 if let Some(m) = self.metrics.as_mut() {
2128 m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len())
2129 }
2130
2131 for peer_id in to_remove_peers {
2132 peers.remove(&peer_id);
2133 }
2134
2135 if peers.len() < self.config.mesh_n_low() {
2137 tracing::debug!(
2138 topic=%topic_hash,
2139 "HEARTBEAT: Mesh low. Topic contains: {} needs: {}",
2140 peers.len(),
2141 self.config.mesh_n_low()
2142 );
2143 let desired_peers = self.config.mesh_n() - peers.len();
2145 let peer_list =
2146 get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| {
2147 !peers.contains(peer)
2148 && !explicit_peers.contains(peer)
2149 && !backoffs.is_backoff_with_slack(topic_hash, peer)
2150 && *scores.get(peer).unwrap_or(&0.0) >= 0.0
2151 });
2152 for peer in &peer_list {
2153 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2154 current_topic.push(topic_hash.clone());
2155 }
2156 tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2158 if let Some(m) = self.metrics.as_mut() {
2159 m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2160 }
2161 peers.extend(peer_list);
2162 }
2163
2164 if peers.len() > self.config.mesh_n_high() {
2166 tracing::debug!(
2167 topic=%topic_hash,
2168 "HEARTBEAT: Mesh high. Topic contains: {} needs: {}",
2169 peers.len(),
2170 self.config.mesh_n_high()
2171 );
2172 let excess_peer_no = peers.len() - self.config.mesh_n();
2173
2174 let mut rng = thread_rng();
2176 let mut shuffled = peers.iter().copied().collect::<Vec<_>>();
2177 shuffled.shuffle(&mut rng);
2178 shuffled.sort_by(|p1, p2| {
2179 let score_p1 = *scores.get(p1).unwrap_or(&0.0);
2180 let score_p2 = *scores.get(p2).unwrap_or(&0.0);
2181
2182 score_p1.partial_cmp(&score_p2).unwrap_or(Ordering::Equal)
2183 });
2184 shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
2186
2187 let mut outbound = {
2189 let outbound_peers = &self.outbound_peers;
2190 shuffled
2191 .iter()
2192 .filter(|p| outbound_peers.contains(*p))
2193 .count()
2194 };
2195
2196 let mut removed = 0;
2199 for peer in shuffled {
2200 if removed == excess_peer_no {
2201 break;
2202 }
2203 if self.outbound_peers.contains(&peer) {
2204 if outbound <= self.config.mesh_outbound_min() {
2205 continue;
2207 }
2208 outbound -= 1;
2210 }
2211
2212 peers.remove(&peer);
2214 let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
2215 current_topic.push(topic_hash.clone());
2216 removed += 1;
2217 }
2218
2219 if let Some(m) = self.metrics.as_mut() {
2220 m.peers_removed(topic_hash, Churn::Excess, removed)
2221 }
2222 }
2223
2224 if peers.len() >= self.config.mesh_n_low() {
2226 let outbound = { peers.iter().filter(|p| outbound_peers.contains(*p)).count() };
2228
2229 if outbound < self.config.mesh_outbound_min() {
2231 let needed = self.config.mesh_outbound_min() - outbound;
2232 let peer_list =
2233 get_random_peers(&self.connected_peers, topic_hash, needed, |peer| {
2234 !peers.contains(peer)
2235 && !explicit_peers.contains(peer)
2236 && !backoffs.is_backoff_with_slack(topic_hash, peer)
2237 && *scores.get(peer).unwrap_or(&0.0) >= 0.0
2238 && outbound_peers.contains(peer)
2239 });
2240 for peer in &peer_list {
2241 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2242 current_topic.push(topic_hash.clone());
2243 }
2244 tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2246 if let Some(m) = self.metrics.as_mut() {
2247 m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len())
2248 }
2249 peers.extend(peer_list);
2250 }
2251 }
2252
2253 if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
2255 && peers.len() > 1
2256 && self.peer_score.is_some()
2257 {
2258 if let Some((_, thresholds, _)) = &self.peer_score {
2259 let mut peers_by_score: Vec<_> = peers.iter().collect();
2269 peers_by_score.sort_by(|p1, p2| {
2270 let p1_score = *scores.get(p1).unwrap_or(&0.0);
2271 let p2_score = *scores.get(p2).unwrap_or(&0.0);
2272 p1_score.partial_cmp(&p2_score).unwrap_or(Equal)
2273 });
2274
2275 let middle = peers_by_score.len() / 2;
2276 let median = if peers_by_score.len() % 2 == 0 {
2277 let sub_middle_peer = *peers_by_score
2278 .get(middle - 1)
2279 .expect("middle < vector length and middle > 0 since peers.len() > 0");
2280 let sub_middle_score = *scores.get(sub_middle_peer).unwrap_or(&0.0);
2281 let middle_peer =
2282 *peers_by_score.get(middle).expect("middle < vector length");
2283 let middle_score = *scores.get(middle_peer).unwrap_or(&0.0);
2284
2285 (sub_middle_score + middle_score) * 0.5
2286 } else {
2287 *scores
2288 .get(*peers_by_score.get(middle).expect("middle < vector length"))
2289 .unwrap_or(&0.0)
2290 };
2291
2292 if median < thresholds.opportunistic_graft_threshold {
2295 let peer_list = get_random_peers(
2296 &self.connected_peers,
2297 topic_hash,
2298 self.config.opportunistic_graft_peers(),
2299 |peer_id| {
2300 !peers.contains(peer_id)
2301 && !explicit_peers.contains(peer_id)
2302 && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2303 && *scores.get(peer_id).unwrap_or(&0.0) > median
2304 },
2305 );
2306 for peer in &peer_list {
2307 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2308 current_topic.push(topic_hash.clone());
2309 }
2310 tracing::debug!(
2312 topic=%topic_hash,
2313 "Opportunistically graft in topic with peers {:?}",
2314 peer_list
2315 );
2316 if let Some(m) = self.metrics.as_mut() {
2317 m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2318 }
2319 peers.extend(peer_list);
2320 }
2321 }
2322 }
2323 if let Some(m) = self.metrics.as_mut() {
2325 m.set_mesh_peers(topic_hash, peers.len())
2326 }
2327 }
2328
2329 {
2331 let fanout = &mut self.fanout; let fanout_ttl = self.config.fanout_ttl();
2333 self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
2334 if *last_pub_time + fanout_ttl < Instant::now() {
2335 tracing::debug!(
2336 topic=%topic_hash,
2337 "HEARTBEAT: Fanout topic removed due to timeout"
2338 );
2339 fanout.remove(topic_hash);
2340 return false;
2341 }
2342 true
2343 });
2344 }
2345
2346 for (topic_hash, peers) in self.fanout.iter_mut() {
2349 let mut to_remove_peers = Vec::new();
2350 let publish_threshold = match &self.peer_score {
2351 Some((_, thresholds, _)) => thresholds.publish_threshold,
2352 _ => 0.0,
2353 };
2354 for peer_id in peers.iter() {
2355 let peer_score = *scores.get(peer_id).unwrap_or(&0.0);
2357 match self.connected_peers.get(peer_id) {
2358 Some(peer) => {
2359 if !peer.topics.contains(topic_hash) || peer_score < publish_threshold {
2360 tracing::debug!(
2361 topic=%topic_hash,
2362 "HEARTBEAT: Peer removed from fanout for topic"
2363 );
2364 to_remove_peers.push(*peer_id);
2365 }
2366 }
2367 None => {
2368 to_remove_peers.push(*peer_id);
2370 }
2371 }
2372 }
2373 for to_remove in to_remove_peers {
2374 peers.remove(&to_remove);
2375 }
2376
2377 if peers.len() < self.config.mesh_n() {
2379 tracing::debug!(
2380 "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
2381 peers.len(),
2382 self.config.mesh_n()
2383 );
2384 let needed_peers = self.config.mesh_n() - peers.len();
2385 let explicit_peers = &self.explicit_peers;
2386 let new_peers =
2387 get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| {
2388 !peers.contains(peer_id)
2389 && !explicit_peers.contains(peer_id)
2390 && *scores.get(peer_id).unwrap_or(&0.0) < publish_threshold
2391 });
2392 peers.extend(new_peers);
2393 }
2394 }
2395
2396 if self.peer_score.is_some() {
2397 tracing::trace!("Mesh message deliveries: {:?}", {
2398 self.mesh
2399 .iter()
2400 .map(|(t, peers)| {
2401 (
2402 t.clone(),
2403 peers
2404 .iter()
2405 .map(|p| {
2406 (
2407 *p,
2408 self.peer_score
2409 .as_ref()
2410 .expect("peer_score.is_some()")
2411 .0
2412 .mesh_message_deliveries(p, t)
2413 .unwrap_or(0.0),
2414 )
2415 })
2416 .collect::<HashMap<PeerId, f64>>(),
2417 )
2418 })
2419 .collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
2420 })
2421 }
2422
2423 self.emit_gossip();
2424
2425 if !to_graft.is_empty() | !to_prune.is_empty() {
2427 self.send_graft_prune(to_graft, to_prune, no_px);
2428 }
2429
2430 self.mcache.shift();
2432
2433 for (peer_id, failed_messages) in self.failed_messages.drain() {
2435 tracing::debug!("Peer couldn't consume messages: {:?}", failed_messages);
2436 self.events
2437 .push_back(ToSwarm::GenerateEvent(Event::SlowPeer {
2438 peer_id,
2439 failed_messages,
2440 }));
2441 }
2442 self.failed_messages.shrink_to_fit();
2443
2444 for peer in self.connected_peers.values_mut() {
2446 while let Some((_front, instant)) = peer.dont_send.front() {
2447 if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
2448 break;
2449 } else {
2450 peer.dont_send.pop_front();
2451 }
2452 }
2453 }
2454
2455 tracing::debug!("Completed Heartbeat");
2456 if let Some(metrics) = self.metrics.as_mut() {
2457 let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
2458 metrics.observe_heartbeat_duration(duration);
2459 }
2460 }
2461
2462 fn emit_gossip(&mut self) {
2465 let mut rng = thread_rng();
2466 let mut messages = Vec::new();
2467 for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
2468 let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash);
2469 if message_ids.is_empty() {
2470 continue;
2471 }
2472
2473 if message_ids.len() > self.config.max_ihave_length() {
2475 tracing::debug!(
2477 "too many messages for gossip; will truncate IHAVE list ({} messages)",
2478 message_ids.len()
2479 );
2480 } else {
2481 message_ids.shuffle(&mut rng);
2483 }
2484
2485 let n_map = |m| {
2487 max(
2488 self.config.gossip_lazy(),
2489 (self.config.gossip_factor() * m as f64) as usize,
2490 )
2491 };
2492 let to_msg_peers =
2494 get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| {
2495 !peers.contains(peer)
2496 && !self.explicit_peers.contains(peer)
2497 && !self.score_below_threshold(peer, |ts| ts.gossip_threshold).0
2498 });
2499
2500 tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());
2501
2502 for peer_id in to_msg_peers {
2503 let mut peer_message_ids = message_ids.clone();
2504
2505 if peer_message_ids.len() > self.config.max_ihave_length() {
2506 peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
2510 peer_message_ids.truncate(self.config.max_ihave_length());
2511 }
2512
2513 messages.push((
2515 peer_id,
2516 RpcOut::IHave(IHave {
2517 topic_hash: topic_hash.clone(),
2518 message_ids: peer_message_ids,
2519 }),
2520 ));
2521 }
2522 }
2523 for (peer_id, message) in messages {
2524 self.send_message(peer_id, message);
2525 }
2526 }
2527
2528 fn send_graft_prune(
2531 &mut self,
2532 to_graft: HashMap<PeerId, Vec<TopicHash>>,
2533 mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
2534 no_px: HashSet<PeerId>,
2535 ) {
2536 for (peer_id, topics) in to_graft.into_iter() {
2538 for topic in &topics {
2539 if let Some((peer_score, ..)) = &mut self.peer_score {
2541 peer_score.graft(&peer_id, topic.clone());
2542 }
2543
2544 peer_added_to_mesh(
2547 peer_id,
2548 vec![topic],
2549 &self.mesh,
2550 &mut self.events,
2551 &self.connected_peers,
2552 );
2553 }
2554 let rpc_msgs = topics.iter().map(|topic_hash| {
2555 RpcOut::Graft(Graft {
2556 topic_hash: topic_hash.clone(),
2557 })
2558 });
2559
2560 let prune_msgs = to_prune
2567 .remove(&peer_id)
2568 .into_iter()
2569 .flatten()
2570 .map(|topic_hash| {
2571 let prune = self.make_prune(
2572 &topic_hash,
2573 &peer_id,
2574 self.config.do_px() && !no_px.contains(&peer_id),
2575 false,
2576 );
2577 RpcOut::Prune(prune)
2578 });
2579
2580 for msg in rpc_msgs.chain(prune_msgs).collect::<Vec<_>>() {
2582 self.send_message(peer_id, msg);
2583 }
2584 }
2585
2586 for (peer_id, topics) in to_prune.iter() {
2589 for topic_hash in topics {
2590 let prune = self.make_prune(
2591 topic_hash,
2592 peer_id,
2593 self.config.do_px() && !no_px.contains(peer_id),
2594 false,
2595 );
2596 self.send_message(*peer_id, RpcOut::Prune(prune));
2597
2598 peer_removed_from_mesh(
2600 *peer_id,
2601 topic_hash,
2602 &self.mesh,
2603 &mut self.events,
2604 &self.connected_peers,
2605 );
2606 }
2607 }
2608 }
2609
2610 fn send_idontwant(
2612 &mut self,
2613 message: &RawMessage,
2614 msg_id: &MessageId,
2615 propagation_source: Option<&PeerId>,
2616 ) {
2617 let Some(mesh_peers) = self.mesh.get(&message.topic) else {
2618 return;
2619 };
2620
2621 let iwant_peers = self.gossip_promises.peers_for_message(msg_id);
2622
2623 let recipient_peers: Vec<PeerId> = mesh_peers
2624 .iter()
2625 .chain(iwant_peers.iter())
2626 .filter(|&peer_id| {
2627 Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref()
2628 })
2629 .cloned()
2630 .collect();
2631
2632 for peer_id in recipient_peers {
2633 let Some(peer) = self.connected_peers.get_mut(&peer_id) else {
2634 tracing::error!(peer = %peer_id,
2635 "Could not IDONTWANT, peer doesn't exist in connected peer list");
2636 continue;
2637 };
2638
2639 if peer.kind != PeerKind::Gossipsubv1_2 {
2641 continue;
2642 }
2643
2644 self.send_message(
2645 peer_id,
2646 RpcOut::IDontWant(IDontWant {
2647 message_ids: vec![msg_id.clone()],
2648 }),
2649 );
2650 }
2651 }
2652
2653 fn forward_msg(
2657 &mut self,
2658 msg_id: &MessageId,
2659 message: RawMessage,
2660 propagation_source: Option<&PeerId>,
2661 originating_peers: HashSet<PeerId>,
2662 ) -> bool {
2663 if let Some((peer_score, ..)) = &mut self.peer_score {
2665 if let Some(peer) = propagation_source {
2666 peer_score.deliver_message(peer, msg_id, &message.topic);
2667 }
2668 }
2669
2670 tracing::debug!(message=%msg_id, "Forwarding message");
2671 let mut recipient_peers = HashSet::new();
2672
2673 for peer_id in &self.explicit_peers {
2677 let Some(peer) = self.connected_peers.get(peer_id) else {
2678 continue;
2679 };
2680 if Some(peer_id) != propagation_source
2681 && !originating_peers.contains(peer_id)
2682 && Some(peer_id) != message.source.as_ref()
2683 && peer.topics.contains(&message.topic)
2684 {
2685 recipient_peers.insert(*peer_id);
2686 }
2687 }
2688
2689 let topic = &message.topic;
2691 if let Some(mesh_peers) = self.mesh.get(topic) {
2693 for peer_id in mesh_peers {
2694 if Some(peer_id) != propagation_source
2695 && !originating_peers.contains(peer_id)
2696 && Some(peer_id) != message.source.as_ref()
2697 {
2698 recipient_peers.insert(*peer_id);
2699 }
2700 }
2701 }
2702
2703 if recipient_peers.is_empty() {
2704 return false;
2705 }
2706
2707 for peer_id in recipient_peers.iter() {
2709 if let Some(peer) = self.connected_peers.get_mut(peer_id) {
2710 if peer.dont_send.contains_key(msg_id) {
2711 tracing::debug!(%peer_id, message=%msg_id, "Peer doesn't want message");
2712 continue;
2713 }
2714
2715 tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer");
2716
2717 self.send_message(
2718 *peer_id,
2719 RpcOut::Forward {
2720 message: message.clone(),
2721 timeout: Delay::new(self.config.forward_queue_duration()),
2722 },
2723 );
2724 }
2725 }
2726 tracing::debug!("Completed forwarding message");
2727 true
2728 }
2729
2730 pub(crate) fn build_raw_message(
2732 &mut self,
2733 topic: TopicHash,
2734 data: Vec<u8>,
2735 ) -> Result<RawMessage, PublishError> {
2736 match &mut self.publish_config {
2737 PublishConfig::Signing {
2738 ref keypair,
2739 author,
2740 inline_key,
2741 last_seq_no,
2742 } => {
2743 let sequence_number = last_seq_no.next();
2744
2745 let signature = {
2746 let message = proto::Message {
2747 from: Some(author.to_bytes()),
2748 data: Some(data.clone()),
2749 seqno: Some(sequence_number.to_be_bytes().to_vec()),
2750 topic: topic.clone().into_string(),
2751 signature: None,
2752 key: None,
2753 };
2754
2755 let mut buf = Vec::with_capacity(message.get_size());
2756 let mut writer = Writer::new(&mut buf);
2757
2758 message
2759 .write_message(&mut writer)
2760 .expect("Encoding to succeed");
2761
2762 let mut signature_bytes = SIGNING_PREFIX.to_vec();
2764 signature_bytes.extend_from_slice(&buf);
2765 Some(keypair.sign(&signature_bytes)?)
2766 };
2767
2768 Ok(RawMessage {
2769 source: Some(*author),
2770 data,
2771 sequence_number: Some(sequence_number),
2774 topic,
2775 signature,
2776 key: inline_key.clone(),
2777 validated: true, })
2779 }
2780 PublishConfig::Author(peer_id) => {
2781 Ok(RawMessage {
2782 source: Some(*peer_id),
2783 data,
2784 sequence_number: Some(rand::random()),
2787 topic,
2788 signature: None,
2789 key: None,
2790 validated: true, })
2792 }
2793 PublishConfig::RandomAuthor => {
2794 Ok(RawMessage {
2795 source: Some(PeerId::random()),
2796 data,
2797 sequence_number: Some(rand::random()),
2800 topic,
2801 signature: None,
2802 key: None,
2803 validated: true, })
2805 }
2806 PublishConfig::Anonymous => {
2807 Ok(RawMessage {
2808 source: None,
2809 data,
2810 sequence_number: None,
2813 topic,
2814 signature: None,
2815 key: None,
2816 validated: true, })
2818 }
2819 }
2820 }
2821
2822 fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool {
2829 if let Some(m) = self.metrics.as_mut() {
2830 if let RpcOut::Publish { ref message, .. } | RpcOut::Forward { ref message, .. } = rpc {
2831 m.msg_sent(&message.topic, message.raw_protobuf_len());
2833 }
2834 }
2835
2836 let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) else {
2837 tracing::error!(peer = %peer_id,
2838 "Could not send rpc to connection handler, peer doesn't exist in connected peer list");
2839 return false;
2840 };
2841
2842 match peer.sender.send_message(rpc) {
2844 Ok(()) => true,
2845 Err(rpc) => {
2846 tracing::warn!(peer=%peer_id, "Send Queue full. Could not send {:?}.", rpc);
2848
2849 let failed_messages = self.failed_messages.entry(peer_id).or_default();
2851 match rpc {
2852 RpcOut::Publish { .. } => {
2853 failed_messages.priority += 1;
2854 failed_messages.publish += 1;
2855 }
2856 RpcOut::Forward { .. } => {
2857 failed_messages.non_priority += 1;
2858 failed_messages.forward += 1;
2859 }
2860 RpcOut::IWant(_) | RpcOut::IHave(_) | RpcOut::IDontWant(_) => {
2861 failed_messages.non_priority += 1;
2862 }
2863 RpcOut::Graft(_)
2864 | RpcOut::Prune(_)
2865 | RpcOut::Subscribe(_)
2866 | RpcOut::Unsubscribe(_) => {
2867 unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
2868 }
2869 }
2870
2871 if let Some((peer_score, ..)) = &mut self.peer_score {
2873 peer_score.failed_message_slow_peer(&peer_id);
2874 }
2875
2876 false
2877 }
2878 }
2879 }
2880
2881 fn on_connection_established(
2882 &mut self,
2883 ConnectionEstablished {
2884 peer_id,
2885 endpoint,
2886 other_established,
2887 ..
2888 }: ConnectionEstablished,
2889 ) {
2890 if endpoint.is_dialer() && other_established == 0 && !self.px_peers.contains(&peer_id) {
2894 self.outbound_peers.insert(peer_id);
2897 }
2898
2899 if let Some((peer_score, ..)) = &mut self.peer_score {
2901 if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2902 peer_score.add_ip(&peer_id, ip);
2903 } else {
2904 tracing::trace!(
2905 peer=%peer_id,
2906 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2907 endpoint
2908 )
2909 }
2910 }
2911
2912 if other_established > 0 {
2913 return; }
2915
2916 if let Some((peer_score, ..)) = &mut self.peer_score {
2917 peer_score.add_peer(peer_id);
2918 }
2919
2920 if self.blacklisted_peers.contains(&peer_id) {
2922 tracing::debug!(peer=%peer_id, "Ignoring connection from blacklisted peer");
2923 return;
2924 }
2925
2926 tracing::debug!(peer=%peer_id, "New peer connected");
2927 for topic_hash in self.mesh.clone().into_keys() {
2929 self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
2930 }
2931 }
2932
2933 fn on_connection_closed(
2934 &mut self,
2935 ConnectionClosed {
2936 peer_id,
2937 connection_id,
2938 endpoint,
2939 remaining_established,
2940 ..
2941 }: ConnectionClosed,
2942 ) {
2943 if let Some((peer_score, ..)) = &mut self.peer_score {
2945 if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2946 peer_score.remove_ip(&peer_id, &ip);
2947 } else {
2948 tracing::trace!(
2949 peer=%peer_id,
2950 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2951 endpoint
2952 )
2953 }
2954 }
2955
2956 if remaining_established != 0 {
2957 if let Some(peer) = self.connected_peers.get_mut(&peer_id) {
2959 let index = peer
2960 .connections
2961 .iter()
2962 .position(|v| v == &connection_id)
2963 .expect("Previously established connection to peer must be present");
2964 peer.connections.remove(index);
2965
2966 if !peer.connections.is_empty() {
2969 for topic in &peer.topics {
2970 if let Some(mesh_peers) = self.mesh.get(topic) {
2971 if mesh_peers.contains(&peer_id) {
2972 self.events.push_back(ToSwarm::NotifyHandler {
2973 peer_id,
2974 event: HandlerIn::JoinedMesh,
2975 handler: NotifyHandler::One(peer.connections[0]),
2976 });
2977 break;
2978 }
2979 }
2980 }
2981 }
2982 }
2983 } else {
2984 tracing::debug!(peer=%peer_id, "Peer disconnected");
2986 let Some(connected_peer) = self.connected_peers.get(&peer_id) else {
2987 tracing::error!(peer_id = %peer_id, "Peer non-existent when handling disconnection");
2988 return;
2989 };
2990
2991 for topic in &connected_peer.topics {
2993 if let Some(mesh_peers) = self.mesh.get_mut(topic) {
2995 if mesh_peers.remove(&peer_id) {
2997 if let Some(m) = self.metrics.as_mut() {
2998 m.peers_removed(topic, Churn::Dc, 1);
2999 m.set_mesh_peers(topic, mesh_peers.len());
3000 }
3001 };
3002 }
3003
3004 if let Some(m) = self.metrics.as_mut() {
3005 m.dec_topic_peers(topic);
3006 }
3007
3008 self.fanout
3010 .get_mut(topic)
3011 .map(|peers| peers.remove(&peer_id));
3012 }
3013
3014 self.px_peers.remove(&peer_id);
3016 self.outbound_peers.remove(&peer_id);
3017
3018 if let Some(metrics) = self.metrics.as_mut() {
3020 metrics.peer_protocol_disconnected(connected_peer.kind);
3021 }
3022
3023 self.connected_peers.remove(&peer_id);
3024
3025 if let Some((peer_score, ..)) = &mut self.peer_score {
3026 peer_score.remove_peer(&peer_id);
3027 }
3028 }
3029 }
3030
3031 fn on_address_change(
3032 &mut self,
3033 AddressChange {
3034 peer_id,
3035 old: endpoint_old,
3036 new: endpoint_new,
3037 ..
3038 }: AddressChange,
3039 ) {
3040 if let Some((peer_score, ..)) = &mut self.peer_score {
3042 if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
3043 peer_score.remove_ip(&peer_id, &ip);
3044 } else {
3045 tracing::trace!(
3046 peer=%&peer_id,
3047 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3048 endpoint_old
3049 )
3050 }
3051 if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
3052 peer_score.add_ip(&peer_id, ip);
3053 } else {
3054 tracing::trace!(
3055 peer=%peer_id,
3056 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3057 endpoint_new
3058 )
3059 }
3060 }
3061 }
3062}
3063
3064fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
3065 addr.iter().find_map(|p| match p {
3066 Ip4(addr) => Some(IpAddr::V4(addr)),
3067 Ip6(addr) => Some(IpAddr::V6(addr)),
3068 _ => None,
3069 })
3070}
3071
3072impl<C, F> NetworkBehaviour for Behaviour<C, F>
3073where
3074 C: Send + 'static + DataTransform,
3075 F: Send + 'static + TopicSubscriptionFilter,
3076{
3077 type ConnectionHandler = Handler;
3078 type ToSwarm = Event;
3079
3080 fn handle_established_inbound_connection(
3081 &mut self,
3082 connection_id: ConnectionId,
3083 peer_id: PeerId,
3084 _: &Multiaddr,
3085 _: &Multiaddr,
3086 ) -> Result<THandler<Self>, ConnectionDenied> {
3087 let connected_peer = self
3093 .connected_peers
3094 .entry(peer_id)
3095 .or_insert(PeerConnections {
3096 kind: PeerKind::Floodsub,
3097 connections: vec![],
3098 sender: Sender::new(self.config.connection_handler_queue_len()),
3099 topics: Default::default(),
3100 dont_send: LinkedHashMap::new(),
3101 });
3102 connected_peer.connections.push(connection_id);
3104
3105 Ok(Handler::new(
3106 self.config.protocol_config(),
3107 connected_peer.sender.new_receiver(),
3108 ))
3109 }
3110
3111 fn handle_established_outbound_connection(
3112 &mut self,
3113 connection_id: ConnectionId,
3114 peer_id: PeerId,
3115 _: &Multiaddr,
3116 _: Endpoint,
3117 _: PortUse,
3118 ) -> Result<THandler<Self>, ConnectionDenied> {
3119 let connected_peer = self
3120 .connected_peers
3121 .entry(peer_id)
3122 .or_insert(PeerConnections {
3123 kind: PeerKind::Floodsub,
3124 connections: vec![],
3125 sender: Sender::new(self.config.connection_handler_queue_len()),
3126 topics: Default::default(),
3127 dont_send: LinkedHashMap::new(),
3128 });
3129 connected_peer.connections.push(connection_id);
3131
3132 Ok(Handler::new(
3133 self.config.protocol_config(),
3134 connected_peer.sender.new_receiver(),
3135 ))
3136 }
3137
3138 fn on_connection_handler_event(
3139 &mut self,
3140 propagation_source: PeerId,
3141 _connection_id: ConnectionId,
3142 handler_event: THandlerOutEvent<Self>,
3143 ) {
3144 match handler_event {
3145 HandlerEvent::PeerKind(kind) => {
3146 if let Some(metrics) = self.metrics.as_mut() {
3149 metrics.peer_protocol_connected(kind);
3150 }
3151
3152 if let PeerKind::NotSupported = kind {
3153 tracing::debug!(
3154 peer=%propagation_source,
3155 "Peer does not support gossipsub protocols"
3156 );
3157 self.events
3158 .push_back(ToSwarm::GenerateEvent(Event::GossipsubNotSupported {
3159 peer_id: propagation_source,
3160 }));
3161 } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) {
3162 tracing::debug!(
3166 peer=%propagation_source,
3167 peer_type=%kind,
3168 "New peer type found for peer"
3169 );
3170 if let PeerKind::Floodsub = conn.kind {
3171 conn.kind = kind;
3172 }
3173 }
3174 }
3175 HandlerEvent::MessageDropped(rpc) => {
3176 if let Some((peer_score, _, _)) = &mut self.peer_score {
3178 peer_score.failed_message_slow_peer(&propagation_source);
3179 }
3180
3181 let failed_messages = self.failed_messages.entry(propagation_source).or_default();
3183 failed_messages.timeout += 1;
3184 match rpc {
3185 RpcOut::Publish { .. } => {
3186 failed_messages.publish += 1;
3187 }
3188 RpcOut::Forward { .. } => {
3189 failed_messages.forward += 1;
3190 }
3191 _ => {}
3192 }
3193
3194 if let Some(metrics) = self.metrics.as_mut() {
3196 match rpc {
3197 RpcOut::Publish { message, .. } => {
3198 metrics.publish_msg_dropped(&message.topic);
3199 metrics.timeout_msg_dropped(&message.topic);
3200 }
3201 RpcOut::Forward { message, .. } => {
3202 metrics.forward_msg_dropped(&message.topic);
3203 metrics.timeout_msg_dropped(&message.topic);
3204 }
3205 _ => {}
3206 }
3207 }
3208 }
3209 HandlerEvent::Message {
3210 rpc,
3211 invalid_messages,
3212 } => {
3213 if !rpc.subscriptions.is_empty() {
3218 self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
3219 }
3220
3221 if let (true, _) =
3223 self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold)
3224 {
3225 tracing::debug!(peer=%propagation_source, "RPC Dropped from greylisted peer");
3226 return;
3227 }
3228
3229 if self.peer_score.is_some() {
3231 for (raw_message, validation_error) in invalid_messages {
3232 self.handle_invalid_message(
3233 &propagation_source,
3234 &raw_message,
3235 RejectReason::ValidationError(validation_error),
3236 )
3237 }
3238 } else {
3239 for (message, validation_error) in invalid_messages {
3241 tracing::warn!(
3242 peer=%propagation_source,
3243 source=?message.source,
3244 "Invalid message from peer. Reason: {:?}",
3245 validation_error,
3246 );
3247 }
3248 }
3249
3250 for (count, raw_message) in rpc.messages.into_iter().enumerate() {
3252 if self.config.max_messages_per_rpc().is_some()
3254 && Some(count) >= self.config.max_messages_per_rpc()
3255 {
3256 tracing::warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
3257 break;
3258 }
3259 self.handle_received_message(raw_message, &propagation_source);
3260 }
3261
3262 let mut ihave_msgs = vec![];
3266 let mut graft_msgs = vec![];
3267 let mut prune_msgs = vec![];
3268 for control_msg in rpc.control_msgs {
3269 match control_msg {
3270 ControlAction::IHave(IHave {
3271 topic_hash,
3272 message_ids,
3273 }) => {
3274 ihave_msgs.push((topic_hash, message_ids));
3275 }
3276 ControlAction::IWant(IWant { message_ids }) => {
3277 self.handle_iwant(&propagation_source, message_ids)
3278 }
3279 ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash),
3280 ControlAction::Prune(Prune {
3281 topic_hash,
3282 peers,
3283 backoff,
3284 }) => prune_msgs.push((topic_hash, peers, backoff)),
3285 ControlAction::IDontWant(IDontWant { message_ids }) => {
3286 let Some(peer) = self.connected_peers.get_mut(&propagation_source)
3287 else {
3288 tracing::error!(peer = %propagation_source,
3289 "Could not handle IDONTWANT, peer doesn't exist in connected peer list");
3290 continue;
3291 };
3292 if let Some(metrics) = self.metrics.as_mut() {
3293 metrics.register_idontwant(message_ids.len());
3294 }
3295 for message_id in message_ids {
3296 peer.dont_send.insert(message_id, Instant::now());
3297 if peer.dont_send.len() > IDONTWANT_CAP {
3299 peer.dont_send.pop_front();
3300 }
3301 }
3302 }
3303 }
3304 }
3305 if !ihave_msgs.is_empty() {
3306 self.handle_ihave(&propagation_source, ihave_msgs);
3307 }
3308 if !graft_msgs.is_empty() {
3309 self.handle_graft(&propagation_source, graft_msgs);
3310 }
3311 if !prune_msgs.is_empty() {
3312 self.handle_prune(&propagation_source, prune_msgs);
3313 }
3314 }
3315 }
3316 }
3317
3318 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
3319 fn poll(
3320 &mut self,
3321 cx: &mut Context<'_>,
3322 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
3323 if let Some(event) = self.events.pop_front() {
3324 return Poll::Ready(event);
3325 }
3326
3327 if let Some((peer_score, _, delay)) = &mut self.peer_score {
3329 if delay.poll_unpin(cx).is_ready() {
3330 peer_score.refresh_scores();
3331 delay.reset(peer_score.params.decay_interval);
3332 }
3333 }
3334
3335 if self.heartbeat.poll_unpin(cx).is_ready() {
3336 self.heartbeat();
3337 self.heartbeat.reset(self.config.heartbeat_interval());
3338 }
3339
3340 Poll::Pending
3341 }
3342
3343 fn on_swarm_event(&mut self, event: FromSwarm) {
3344 match event {
3345 FromSwarm::ConnectionEstablished(connection_established) => {
3346 self.on_connection_established(connection_established)
3347 }
3348 FromSwarm::ConnectionClosed(connection_closed) => {
3349 self.on_connection_closed(connection_closed)
3350 }
3351 FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
3352 _ => {}
3353 }
3354 }
3355}
3356
3357fn peer_added_to_mesh(
3361 peer_id: PeerId,
3362 new_topics: Vec<&TopicHash>,
3363 mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3364 events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3365 connections: &HashMap<PeerId, PeerConnections>,
3366) {
3367 let connection_id = match connections.get(&peer_id) {
3369 Some(p) => p
3370 .connections
3371 .first()
3372 .expect("There should be at least one connection to a peer."),
3373 None => {
3374 tracing::error!(peer_id=%peer_id, "Peer not existent when added to the mesh");
3375 return;
3376 }
3377 };
3378
3379 if let Some(peer) = connections.get(&peer_id) {
3380 for topic in &peer.topics {
3381 if !new_topics.contains(&topic) {
3382 if let Some(mesh_peers) = mesh.get(topic) {
3383 if mesh_peers.contains(&peer_id) {
3384 return;
3386 }
3387 }
3388 }
3389 }
3390 }
3391 events.push_back(ToSwarm::NotifyHandler {
3393 peer_id,
3394 event: HandlerIn::JoinedMesh,
3395 handler: NotifyHandler::One(*connection_id),
3396 });
3397}
3398
3399fn peer_removed_from_mesh(
3403 peer_id: PeerId,
3404 old_topic: &TopicHash,
3405 mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3406 events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3407 connections: &HashMap<PeerId, PeerConnections>,
3408) {
3409 let connection_id = match connections.get(&peer_id) {
3411 Some(p) => p
3412 .connections
3413 .first()
3414 .expect("There should be at least one connection to a peer."),
3415 None => {
3416 tracing::error!(peer_id=%peer_id, "Peer not existent when removed from mesh");
3417 return;
3418 }
3419 };
3420
3421 if let Some(peer) = connections.get(&peer_id) {
3422 for topic in &peer.topics {
3423 if topic != old_topic {
3424 if let Some(mesh_peers) = mesh.get(topic) {
3425 if mesh_peers.contains(&peer_id) {
3426 return;
3428 }
3429 }
3430 }
3431 }
3432 }
3433 events.push_back(ToSwarm::NotifyHandler {
3435 peer_id,
3436 event: HandlerIn::LeftMesh,
3437 handler: NotifyHandler::One(*connection_id),
3438 });
3439}
3440
3441fn get_random_peers_dynamic(
3445 connected_peers: &HashMap<PeerId, PeerConnections>,
3446 topic_hash: &TopicHash,
3447 n_map: impl Fn(usize) -> usize,
3449 mut f: impl FnMut(&PeerId) -> bool,
3450) -> BTreeSet<PeerId> {
3451 let mut gossip_peers = connected_peers
3452 .iter()
3453 .filter(|(_, p)| p.topics.contains(topic_hash))
3454 .filter(|(peer_id, _)| f(peer_id))
3455 .filter(|(_, p)| p.kind.is_gossipsub())
3456 .map(|(peer_id, _)| *peer_id)
3457 .collect::<Vec<PeerId>>();
3458
3459 let n = n_map(gossip_peers.len());
3461 if gossip_peers.len() <= n {
3462 tracing::debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
3463 return gossip_peers.into_iter().collect();
3464 }
3465
3466 let mut rng = thread_rng();
3468 gossip_peers.partial_shuffle(&mut rng, n);
3469
3470 tracing::debug!("RANDOM PEERS: Got {:?} peers", n);
3471
3472 gossip_peers.into_iter().take(n).collect()
3473}
3474
3475fn get_random_peers(
3478 connected_peers: &HashMap<PeerId, PeerConnections>,
3479 topic_hash: &TopicHash,
3480 n: usize,
3481 f: impl FnMut(&PeerId) -> bool,
3482) -> BTreeSet<PeerId> {
3483 get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f)
3484}
3485
3486fn validate_config(
3489 authenticity: &MessageAuthenticity,
3490 validation_mode: &ValidationMode,
3491) -> Result<(), &'static str> {
3492 match validation_mode {
3493 ValidationMode::Anonymous => {
3494 if authenticity.is_signing() {
3495 return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
3496 }
3497
3498 if !authenticity.is_anonymous() {
3499 return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
3500 }
3501 }
3502 ValidationMode::Strict => {
3503 if !authenticity.is_signing() {
3504 return Err(
3505 "Messages will be
3506 published unsigned and incoming unsigned messages will be rejected. Consider adjusting
3507 the validation or privacy settings in the config"
3508 );
3509 }
3510 }
3511 _ => {}
3512 }
3513 Ok(())
3514}
3515
3516impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F> {
3517 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3518 f.debug_struct("Behaviour")
3519 .field("config", &self.config)
3520 .field("events", &self.events.len())
3521 .field("publish_config", &self.publish_config)
3522 .field("mesh", &self.mesh)
3523 .field("fanout", &self.fanout)
3524 .field("fanout_last_pub", &self.fanout_last_pub)
3525 .field("mcache", &self.mcache)
3526 .field("heartbeat", &self.heartbeat)
3527 .finish()
3528 }
3529}
3530
3531impl fmt::Debug for PublishConfig {
3532 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3533 match self {
3534 PublishConfig::Signing { author, .. } => {
3535 f.write_fmt(format_args!("PublishConfig::Signing({author})"))
3536 }
3537 PublishConfig::Author(author) => {
3538 f.write_fmt(format_args!("PublishConfig::Author({author})"))
3539 }
3540 PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
3541 PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
3542 }
3543 }
3544}