libp2p_gossipsub/
config.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{borrow::Cow, sync::Arc, time::Duration};
22
23use libp2p_identity::PeerId;
24use libp2p_swarm::StreamProtocol;
25
26use crate::{
27    error::ConfigBuilderError,
28    protocol::{ProtocolConfig, ProtocolId, FLOODSUB_PROTOCOL},
29    types::{Message, MessageId, PeerKind},
30};
31
32/// The types of message validation that can be employed by gossipsub.
33#[derive(Debug, Clone)]
34pub enum ValidationMode {
35    /// This is the default setting. This requires the message author to be a valid [`PeerId`] and
36    /// to be present as well as the sequence number. All messages must have valid signatures.
37    ///
38    /// NOTE: This setting will reject messages from nodes using
39    /// [`crate::behaviour::MessageAuthenticity::Anonymous`] and all messages that do not have
40    /// signatures.
41    Strict,
42    /// This setting permits messages that have no author, sequence number or signature. If any of
43    /// these fields exist in the message these are validated.
44    Permissive,
45    /// This setting requires the author, sequence number and signature fields of a message to be
46    /// empty. Any message that contains these fields is considered invalid.
47    Anonymous,
48    /// This setting does not check the author, sequence number or signature fields of incoming
49    /// messages. If these fields contain data, they are simply ignored.
50    ///
51    /// NOTE: This setting will consider messages with invalid signatures as valid messages.
52    None,
53}
54
55/// Selector for custom Protocol Id
56#[derive(Clone, Debug, PartialEq, Eq)]
57pub enum Version {
58    V1_0,
59    V1_1,
60}
61
62/// Configuration parameters that define the performance of the gossipsub network.
63#[derive(Clone)]
64pub struct Config {
65    protocol: ProtocolConfig,
66    history_length: usize,
67    history_gossip: usize,
68    mesh_n: usize,
69    mesh_n_low: usize,
70    mesh_n_high: usize,
71    retain_scores: usize,
72    gossip_lazy: usize,
73    gossip_factor: f64,
74    heartbeat_initial_delay: Duration,
75    heartbeat_interval: Duration,
76    fanout_ttl: Duration,
77    check_explicit_peers_ticks: u64,
78    duplicate_cache_time: Duration,
79    validate_messages: bool,
80    message_id_fn: Arc<dyn Fn(&Message) -> MessageId + Send + Sync + 'static>,
81    allow_self_origin: bool,
82    do_px: bool,
83    prune_peers: usize,
84    prune_backoff: Duration,
85    unsubscribe_backoff: Duration,
86    backoff_slack: u32,
87    flood_publish: bool,
88    graft_flood_threshold: Duration,
89    mesh_outbound_min: usize,
90    opportunistic_graft_ticks: u64,
91    opportunistic_graft_peers: usize,
92    gossip_retransimission: u32,
93    max_messages_per_rpc: Option<usize>,
94    max_ihave_length: usize,
95    max_ihave_messages: usize,
96    iwant_followup_time: Duration,
97    published_message_ids_cache_time: Duration,
98    connection_handler_queue_len: usize,
99    connection_handler_publish_duration: Duration,
100    connection_handler_forward_duration: Duration,
101    idontwant_message_size_threshold: usize,
102    idontwant_on_publish: bool,
103}
104
105impl Config {
106    pub(crate) fn protocol_config(&self) -> ProtocolConfig {
107        self.protocol.clone()
108    }
109
110    // Overlay network parameters.
111    /// Number of heartbeats to keep in the `memcache` (default is 5).
112    pub fn history_length(&self) -> usize {
113        self.history_length
114    }
115
116    /// Number of past heartbeats to gossip about (default is 3).
117    pub fn history_gossip(&self) -> usize {
118        self.history_gossip
119    }
120
121    /// Target number of peers for the mesh network (D in the spec, default is 6).
122    pub fn mesh_n(&self) -> usize {
123        self.mesh_n
124    }
125
126    /// Minimum number of peers in mesh network before adding more (D_lo in the spec, default is 5).
127    pub fn mesh_n_low(&self) -> usize {
128        self.mesh_n_low
129    }
130
131    /// Maximum number of peers in mesh network before removing some (D_high in the spec, default
132    /// is 12).
133    pub fn mesh_n_high(&self) -> usize {
134        self.mesh_n_high
135    }
136
137    /// Affects how peers are selected when pruning a mesh due to over subscription.
138    ///
139    ///  At least `retain_scores` of the retained peers will be high-scoring, while the remainder
140    /// are  chosen randomly (D_score in the spec, default is 4).
141    pub fn retain_scores(&self) -> usize {
142        self.retain_scores
143    }
144
145    /// Minimum number of peers to emit gossip to during a heartbeat (D_lazy in the spec,
146    /// default is 6).
147    pub fn gossip_lazy(&self) -> usize {
148        self.gossip_lazy
149    }
150
151    /// Affects how many peers we will emit gossip to at each heartbeat.
152    ///
153    /// We will send gossip to `gossip_factor * (total number of non-mesh peers)`, or
154    /// `gossip_lazy`, whichever is greater. The default is 0.25.
155    pub fn gossip_factor(&self) -> f64 {
156        self.gossip_factor
157    }
158
159    /// Initial delay in each heartbeat (default is 5 seconds).
160    pub fn heartbeat_initial_delay(&self) -> Duration {
161        self.heartbeat_initial_delay
162    }
163
164    /// Time between each heartbeat (default is 1 second).
165    pub fn heartbeat_interval(&self) -> Duration {
166        self.heartbeat_interval
167    }
168
169    /// Time to live for fanout peers (default is 60 seconds).
170    pub fn fanout_ttl(&self) -> Duration {
171        self.fanout_ttl
172    }
173
174    /// The number of heartbeat ticks until we recheck the connection to explicit peers and
175    /// reconnecting if necessary (default 300).
176    pub fn check_explicit_peers_ticks(&self) -> u64 {
177        self.check_explicit_peers_ticks
178    }
179
180    /// The maximum byte size for each gossipsub RPC (default is 65536 bytes).
181    ///
182    /// This represents the maximum size of the published message. It is additionally wrapped
183    /// in a protobuf struct, so the actual wire size may be a bit larger. It must be at least
184    /// large enough to support basic control messages. If Peer eXchange is enabled, this
185    /// must be large enough to transmit the desired peer information on pruning. It must be at
186    /// least 100 bytes. Default is 65536 bytes.
187    pub fn max_transmit_size(&self) -> usize {
188        self.protocol.max_transmit_size
189    }
190
191    /// Duplicates are prevented by storing message id's of known messages in an LRU time cache.
192    /// This settings sets the time period that messages are stored in the cache. Duplicates can be
193    /// received if duplicate messages are sent at a time greater than this setting apart. The
194    /// default is 1 minute.
195    pub fn duplicate_cache_time(&self) -> Duration {
196        self.duplicate_cache_time
197    }
198
199    /// When set to `true`, prevents automatic forwarding of all received messages. This setting
200    /// allows a user to validate the messages before propagating them to their peers. If set to
201    /// true, the user must manually call [`crate::Behaviour::report_message_validation_result()`]
202    /// on the behaviour to forward message once validated (default is `false`).
203    /// The default is `false`.
204    pub fn validate_messages(&self) -> bool {
205        self.validate_messages
206    }
207
208    /// Determines the level of validation used when receiving messages. See [`ValidationMode`]
209    /// for the available types. The default is ValidationMode::Strict.
210    pub fn validation_mode(&self) -> &ValidationMode {
211        &self.protocol.validation_mode
212    }
213
214    /// A user-defined function allowing the user to specify the message id of a gossipsub message.
215    /// The default value is to concatenate the source peer id with a sequence number. Setting this
216    /// parameter allows the user to address packets arbitrarily. One example is content based
217    /// addressing, where this function may be set to `hash(message)`. This would prevent messages
218    /// of the same content from being duplicated.
219    ///
220    /// The function takes a [`Message`] as input and outputs a String to be interpreted as
221    /// the message id.
222    pub fn message_id(&self, message: &Message) -> MessageId {
223        (self.message_id_fn)(message)
224    }
225
226    /// By default, gossipsub will reject messages that are sent to us that have the same message
227    /// source as we have specified locally. Enabling this, allows these messages and prevents
228    /// penalizing the peer that sent us the message. Default is false.
229    pub fn allow_self_origin(&self) -> bool {
230        self.allow_self_origin
231    }
232
233    /// Whether Peer eXchange is enabled; this should be enabled in bootstrappers and other well
234    /// connected/trusted nodes. The default is false.
235    ///
236    /// Note: Peer exchange is not implemented today, see
237    /// <https://github.com/libp2p/rust-libp2p/issues/2398>.
238    pub fn do_px(&self) -> bool {
239        self.do_px
240    }
241
242    /// Controls the number of peers to include in prune Peer eXchange.
243    /// When we prune a peer that's eligible for PX (has a good score, etc), we will try to
244    /// send them signed peer records for up to `prune_peers` other peers that we
245    /// know of. It is recommended that this value is larger than `mesh_n_high` so that the pruned
246    /// peer can reliably form a full mesh. The default is typically 16 however until signed
247    /// records are spec'd this is disabled and set to 0.
248    pub fn prune_peers(&self) -> usize {
249        self.prune_peers
250    }
251
252    /// Controls the backoff time for pruned peers. This is how long
253    /// a peer must wait before attempting to graft into our mesh again after being pruned.
254    /// When pruning a peer, we send them our value of `prune_backoff` so they know
255    /// the minimum time to wait. Peers running older versions may not send a backoff time,
256    /// so if we receive a prune message without one, we will wait at least `prune_backoff`
257    /// before attempting to re-graft. The default is one minute.
258    pub fn prune_backoff(&self) -> Duration {
259        self.prune_backoff
260    }
261
262    /// Controls the backoff time when unsubscribing from a topic.
263    ///
264    /// This is how long to wait before resubscribing to the topic. A short backoff period in case
265    /// of an unsubscribe event allows reaching a healthy mesh in a more timely manner. The default
266    /// is 10 seconds.
267    pub fn unsubscribe_backoff(&self) -> Duration {
268        self.unsubscribe_backoff
269    }
270
271    /// Number of heartbeat slots considered as slack for backoffs. This guarantees that we wait
272    /// at least backoff_slack heartbeats after a backoff is over before we try to graft. This
273    /// solves problems occurring through high latencies. In particular if
274    /// `backoff_slack * heartbeat_interval` is longer than any latencies between processing
275    /// prunes on our side and processing prunes on the receiving side this guarantees that we
276    /// get not punished for too early grafting. The default is 1.
277    pub fn backoff_slack(&self) -> u32 {
278        self.backoff_slack
279    }
280
281    /// Whether to do flood publishing or not. If enabled newly created messages will always be
282    /// sent to all peers that are subscribed to the topic and have a good enough score.
283    /// The default is true.
284    pub fn flood_publish(&self) -> bool {
285        self.flood_publish
286    }
287
288    /// If a GRAFT comes before `graft_flood_threshold` has elapsed since the last PRUNE,
289    /// then there is an extra score penalty applied to the peer through P7.
290    pub fn graft_flood_threshold(&self) -> Duration {
291        self.graft_flood_threshold
292    }
293
294    /// Minimum number of outbound peers in the mesh network before adding more (D_out in the spec).
295    /// This value must be smaller or equal than `mesh_n / 2` and smaller than `mesh_n_low`.
296    /// The default is 2.
297    pub fn mesh_outbound_min(&self) -> usize {
298        self.mesh_outbound_min
299    }
300
301    /// Number of heartbeat ticks that specify the interval in which opportunistic grafting is
302    /// applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh
303    /// peers to replace lower-scoring ones, if the median score of our mesh peers falls below a
304    /// threshold (see <https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds>).
305    /// The default is 60.
306    pub fn opportunistic_graft_ticks(&self) -> u64 {
307        self.opportunistic_graft_ticks
308    }
309
310    /// Controls how many times we will allow a peer to request the same message id through IWANT
311    /// gossip before we start ignoring them. This is designed to prevent peers from spamming us
312    /// with requests and wasting our resources. The default is 3.
313    pub fn gossip_retransimission(&self) -> u32 {
314        self.gossip_retransimission
315    }
316
317    /// The maximum number of new peers to graft to during opportunistic grafting. The default is 2.
318    pub fn opportunistic_graft_peers(&self) -> usize {
319        self.opportunistic_graft_peers
320    }
321
322    /// The maximum number of messages we will process in a given RPC. If this is unset, there is
323    /// no limit. The default is None.
324    pub fn max_messages_per_rpc(&self) -> Option<usize> {
325        self.max_messages_per_rpc
326    }
327
328    /// The maximum number of messages to include in an IHAVE message.
329    /// Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a
330    /// peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the
331    /// default if your system is pushing more than 5000 messages in GossipSubHistoryGossip
332    /// heartbeats; with the defaults this is 1666 messages/s. The default is 5000.
333    pub fn max_ihave_length(&self) -> usize {
334        self.max_ihave_length
335    }
336
337    /// GossipSubMaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer
338    /// within a heartbeat.
339    pub fn max_ihave_messages(&self) -> usize {
340        self.max_ihave_messages
341    }
342
343    /// Time to wait for a message requested through IWANT following an IHAVE advertisement.
344    /// If the message is not received within this window, a broken promise is declared and
345    /// the router may apply behavioural penalties. The default is 3 seconds.
346    pub fn iwant_followup_time(&self) -> Duration {
347        self.iwant_followup_time
348    }
349
350    /// Enable support for flooodsub peers. Default false.
351    pub fn support_floodsub(&self) -> bool {
352        self.protocol.protocol_ids.contains(&FLOODSUB_PROTOCOL)
353    }
354
355    /// Published message ids time cache duration. The default is 10 seconds.
356    pub fn published_message_ids_cache_time(&self) -> Duration {
357        self.published_message_ids_cache_time
358    }
359
360    /// The max number of messages a `ConnectionHandler` can buffer. The default is 5000.
361    pub fn connection_handler_queue_len(&self) -> usize {
362        self.connection_handler_queue_len
363    }
364
365    /// The duration a message to be published can wait to be sent before it is abandoned. The
366    /// default is 5 seconds.
367    pub fn publish_queue_duration(&self) -> Duration {
368        self.connection_handler_publish_duration
369    }
370
371    /// The duration a message to be forwarded can wait to be sent before it is abandoned. The
372    /// default is 1s.
373    pub fn forward_queue_duration(&self) -> Duration {
374        self.connection_handler_forward_duration
375    }
376
377    /// The message size threshold for which IDONTWANT messages are sent.
378    /// Sending IDONTWANT messages for small messages can have a negative effect to the overall
379    /// traffic and CPU load. This acts as a lower bound cutoff for the message size to which
380    /// IDONTWANT won't be sent to peers. Only works if the peers support Gossipsub1.2
381    /// (see <https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message>)
382    /// default is 1kB
383    pub fn idontwant_message_size_threshold(&self) -> usize {
384        self.idontwant_message_size_threshold
385    }
386
387    /// Send IDONTWANT messages after publishing message on gossip. This is an optimisation
388    /// to avoid bandwidth consumption by downloading the published message over gossip.
389    /// By default it is false.
390    pub fn idontwant_on_publish(&self) -> bool {
391        self.idontwant_on_publish
392    }
393}
394
395impl Default for Config {
396    fn default() -> Self {
397        // use ConfigBuilder to also validate defaults
398        ConfigBuilder::default()
399            .build()
400            .expect("Default config parameters should be valid parameters")
401    }
402}
403
404/// The builder struct for constructing a gossipsub configuration.
405pub struct ConfigBuilder {
406    config: Config,
407    invalid_protocol: bool, // This is a bit of a hack to only expose one error to the user.
408}
409
410impl Default for ConfigBuilder {
411    fn default() -> Self {
412        ConfigBuilder {
413            config: Config {
414                protocol: ProtocolConfig::default(),
415                history_length: 5,
416                history_gossip: 3,
417                mesh_n: 6,
418                mesh_n_low: 5,
419                mesh_n_high: 12,
420                retain_scores: 4,
421                gossip_lazy: 6, // default to mesh_n
422                gossip_factor: 0.25,
423                heartbeat_initial_delay: Duration::from_secs(5),
424                heartbeat_interval: Duration::from_secs(1),
425                fanout_ttl: Duration::from_secs(60),
426                check_explicit_peers_ticks: 300,
427                duplicate_cache_time: Duration::from_secs(60),
428                validate_messages: false,
429                message_id_fn: Arc::new(|message| {
430                    // default message id is: source + sequence number
431                    // NOTE: If either the peer_id or source is not provided, we set to 0;
432                    let mut source_string = if let Some(peer_id) = message.source.as_ref() {
433                        peer_id.to_base58()
434                    } else {
435                        PeerId::from_bytes(&[0, 1, 0])
436                            .expect("Valid peer id")
437                            .to_base58()
438                    };
439                    source_string
440                        .push_str(&message.sequence_number.unwrap_or_default().to_string());
441                    MessageId::from(source_string)
442                }),
443                allow_self_origin: false,
444                do_px: false,
445                // NOTE: Increasing this currently has little effect until Signed
446                // records are implemented.
447                prune_peers: 0,
448                prune_backoff: Duration::from_secs(60),
449                unsubscribe_backoff: Duration::from_secs(10),
450                backoff_slack: 1,
451                flood_publish: true,
452                graft_flood_threshold: Duration::from_secs(10),
453                mesh_outbound_min: 2,
454                opportunistic_graft_ticks: 60,
455                opportunistic_graft_peers: 2,
456                gossip_retransimission: 3,
457                max_messages_per_rpc: None,
458                max_ihave_length: 5000,
459                max_ihave_messages: 10,
460                iwant_followup_time: Duration::from_secs(3),
461                published_message_ids_cache_time: Duration::from_secs(10),
462                connection_handler_queue_len: 5000,
463                connection_handler_publish_duration: Duration::from_secs(5),
464                connection_handler_forward_duration: Duration::from_secs(1),
465                idontwant_message_size_threshold: 1000,
466                idontwant_on_publish: false,
467            },
468            invalid_protocol: false,
469        }
470    }
471}
472
473impl From<Config> for ConfigBuilder {
474    fn from(config: Config) -> Self {
475        ConfigBuilder {
476            config,
477            invalid_protocol: false,
478        }
479    }
480}
481
482impl ConfigBuilder {
483    /// The protocol id prefix to negotiate this protocol (default is `/meshsub/1.1.0` and
484    /// `/meshsub/1.0.0`).
485    pub fn protocol_id_prefix(
486        &mut self,
487        protocol_id_prefix: impl Into<Cow<'static, str>>,
488    ) -> &mut Self {
489        let cow = protocol_id_prefix.into();
490
491        match (
492            StreamProtocol::try_from_owned(format!("{}/1.1.0", cow)),
493            StreamProtocol::try_from_owned(format!("{}/1.0.0", cow)),
494        ) {
495            (Ok(p1), Ok(p2)) => {
496                self.config.protocol.protocol_ids = vec![
497                    ProtocolId {
498                        protocol: p1,
499                        kind: PeerKind::Gossipsubv1_1,
500                    },
501                    ProtocolId {
502                        protocol: p2,
503                        kind: PeerKind::Gossipsub,
504                    },
505                ]
506            }
507            _ => {
508                self.invalid_protocol = true;
509            }
510        }
511
512        self
513    }
514
515    /// The full protocol id to negotiate this protocol (does not append `/1.0.0` or `/1.1.0`).
516    pub fn protocol_id(
517        &mut self,
518        protocol_id: impl Into<Cow<'static, str>>,
519        custom_id_version: Version,
520    ) -> &mut Self {
521        let cow = protocol_id.into();
522
523        match StreamProtocol::try_from_owned(cow.to_string()) {
524            Ok(protocol) => {
525                self.config.protocol.protocol_ids = vec![ProtocolId {
526                    protocol,
527                    kind: match custom_id_version {
528                        Version::V1_1 => PeerKind::Gossipsubv1_1,
529                        Version::V1_0 => PeerKind::Gossipsub,
530                    },
531                }]
532            }
533            _ => {
534                self.invalid_protocol = true;
535            }
536        }
537
538        self
539    }
540
541    /// Number of heartbeats to keep in the `memcache` (default is 5).
542    pub fn history_length(&mut self, history_length: usize) -> &mut Self {
543        self.config.history_length = history_length;
544        self
545    }
546
547    /// Number of past heartbeats to gossip about (default is 3).
548    pub fn history_gossip(&mut self, history_gossip: usize) -> &mut Self {
549        self.config.history_gossip = history_gossip;
550        self
551    }
552
553    /// Target number of peers for the mesh network (D in the spec, default is 6).
554    pub fn mesh_n(&mut self, mesh_n: usize) -> &mut Self {
555        self.config.mesh_n = mesh_n;
556        self
557    }
558
559    /// Minimum number of peers in mesh network before adding more (D_lo in the spec, default is 4).
560    pub fn mesh_n_low(&mut self, mesh_n_low: usize) -> &mut Self {
561        self.config.mesh_n_low = mesh_n_low;
562        self
563    }
564
565    /// Maximum number of peers in mesh network before removing some (D_high in the spec, default
566    /// is 12).
567    pub fn mesh_n_high(&mut self, mesh_n_high: usize) -> &mut Self {
568        self.config.mesh_n_high = mesh_n_high;
569        self
570    }
571
572    /// Affects how peers are selected when pruning a mesh due to over subscription.
573    ///
574    /// At least [`Self::retain_scores`] of the retained peers will be high-scoring, while the
575    /// remainder are chosen randomly (D_score in the spec, default is 4).
576    pub fn retain_scores(&mut self, retain_scores: usize) -> &mut Self {
577        self.config.retain_scores = retain_scores;
578        self
579    }
580
581    /// Minimum number of peers to emit gossip to during a heartbeat (D_lazy in the spec,
582    /// default is 6).
583    pub fn gossip_lazy(&mut self, gossip_lazy: usize) -> &mut Self {
584        self.config.gossip_lazy = gossip_lazy;
585        self
586    }
587
588    /// Affects how many peers we will emit gossip to at each heartbeat.
589    ///
590    /// We will send gossip to `gossip_factor * (total number of non-mesh peers)`, or
591    /// `gossip_lazy`, whichever is greater. The default is 0.25.
592    pub fn gossip_factor(&mut self, gossip_factor: f64) -> &mut Self {
593        self.config.gossip_factor = gossip_factor;
594        self
595    }
596
597    /// Initial delay in each heartbeat (default is 5 seconds).
598    pub fn heartbeat_initial_delay(&mut self, heartbeat_initial_delay: Duration) -> &mut Self {
599        self.config.heartbeat_initial_delay = heartbeat_initial_delay;
600        self
601    }
602
603    /// Time between each heartbeat (default is 1 second).
604    pub fn heartbeat_interval(&mut self, heartbeat_interval: Duration) -> &mut Self {
605        self.config.heartbeat_interval = heartbeat_interval;
606        self
607    }
608
609    /// The number of heartbeat ticks until we recheck the connection to explicit peers and
610    /// reconnecting if necessary (default 300).
611    pub fn check_explicit_peers_ticks(&mut self, check_explicit_peers_ticks: u64) -> &mut Self {
612        self.config.check_explicit_peers_ticks = check_explicit_peers_ticks;
613        self
614    }
615
616    /// Time to live for fanout peers (default is 60 seconds).
617    pub fn fanout_ttl(&mut self, fanout_ttl: Duration) -> &mut Self {
618        self.config.fanout_ttl = fanout_ttl;
619        self
620    }
621
622    /// The maximum byte size for each gossip (default is 2048 bytes).
623    pub fn max_transmit_size(&mut self, max_transmit_size: usize) -> &mut Self {
624        self.config.protocol.max_transmit_size = max_transmit_size;
625        self
626    }
627
628    /// Duplicates are prevented by storing message id's of known messages in an LRU time cache.
629    /// This settings sets the time period that messages are stored in the cache. Duplicates can be
630    /// received if duplicate messages are sent at a time greater than this setting apart. The
631    /// default is 1 minute.
632    pub fn duplicate_cache_time(&mut self, cache_size: Duration) -> &mut Self {
633        self.config.duplicate_cache_time = cache_size;
634        self
635    }
636
637    /// When set, prevents automatic forwarding of all received messages. This setting
638    /// allows a user to validate the messages before propagating them to their peers. If set,
639    /// the user must manually call [`crate::Behaviour::report_message_validation_result()`] on the
640    /// behaviour to forward a message once validated.
641    pub fn validate_messages(&mut self) -> &mut Self {
642        self.config.validate_messages = true;
643        self
644    }
645
646    /// Determines the level of validation used when receiving messages. See [`ValidationMode`]
647    /// for the available types. The default is ValidationMode::Strict.
648    pub fn validation_mode(&mut self, validation_mode: ValidationMode) -> &mut Self {
649        self.config.protocol.validation_mode = validation_mode;
650        self
651    }
652
653    /// A user-defined function allowing the user to specify the message id of a gossipsub message.
654    /// The default value is to concatenate the source peer id with a sequence number. Setting this
655    /// parameter allows the user to address packets arbitrarily. One example is content based
656    /// addressing, where this function may be set to `hash(message)`. This would prevent messages
657    /// of the same content from being duplicated.
658    ///
659    /// The function takes a [`Message`] as input and outputs a String to be
660    /// interpreted as the message id.
661    pub fn message_id_fn<F>(&mut self, id_fn: F) -> &mut Self
662    where
663        F: Fn(&Message) -> MessageId + Send + Sync + 'static,
664    {
665        self.config.message_id_fn = Arc::new(id_fn);
666        self
667    }
668
669    /// Enables Peer eXchange. This should be enabled in bootstrappers and other well
670    /// connected/trusted nodes. The default is false.
671    ///
672    /// Note: Peer exchange is not implemented today, see
673    /// <https://github.com/libp2p/rust-libp2p/issues/2398>.
674    pub fn do_px(&mut self) -> &mut Self {
675        self.config.do_px = true;
676        self
677    }
678
679    /// Controls the number of peers to include in prune Peer eXchange.
680    ///
681    /// When we prune a peer that's eligible for PX (has a good score, etc), we will try to
682    /// send them signed peer records for up to [`Self::prune_peers] other peers that we
683    /// know of. It is recommended that this value is larger than [`Self::mesh_n_high`] so that the
684    /// pruned peer can reliably form a full mesh. The default is 16.
685    pub fn prune_peers(&mut self, prune_peers: usize) -> &mut Self {
686        self.config.prune_peers = prune_peers;
687        self
688    }
689
690    /// Controls the backoff time for pruned peers. This is how long
691    /// a peer must wait before attempting to graft into our mesh again after being pruned.
692    /// When pruning a peer, we send them our value of [`Self::prune_backoff`] so they know
693    /// the minimum time to wait. Peers running older versions may not send a backoff time,
694    /// so if we receive a prune message without one, we will wait at least [`Self::prune_backoff`]
695    /// before attempting to re-graft. The default is one minute.
696    pub fn prune_backoff(&mut self, prune_backoff: Duration) -> &mut Self {
697        self.config.prune_backoff = prune_backoff;
698        self
699    }
700
701    /// Controls the backoff time when unsubscribing from a topic.
702    ///
703    /// This is how long to wait before resubscribing to the topic. A short backoff period in case
704    /// of an unsubscribe event allows reaching a healthy mesh in a more timely manner. The default
705    /// is 10 seconds.
706    pub fn unsubscribe_backoff(&mut self, unsubscribe_backoff: u64) -> &mut Self {
707        self.config.unsubscribe_backoff = Duration::from_secs(unsubscribe_backoff);
708        self
709    }
710
711    /// Number of heartbeat slots considered as slack for backoffs. This guarantees that we wait
712    /// at least backoff_slack heartbeats after a backoff is over before we try to graft. This
713    /// solves problems occurring through high latencies. In particular if
714    /// `backoff_slack * heartbeat_interval` is longer than any latencies between processing
715    /// prunes on our side and processing prunes on the receiving side this guarantees that we
716    /// get not punished for too early grafting. The default is 1.
717    pub fn backoff_slack(&mut self, backoff_slack: u32) -> &mut Self {
718        self.config.backoff_slack = backoff_slack;
719        self
720    }
721
722    /// Whether to do flood publishing or not. If enabled newly created messages will always be
723    /// sent to all peers that are subscribed to the topic and have a good enough score.
724    /// The default is true.
725    pub fn flood_publish(&mut self, flood_publish: bool) -> &mut Self {
726        self.config.flood_publish = flood_publish;
727        self
728    }
729
730    /// If a GRAFT comes before `graft_flood_threshold` has elapsed since the last PRUNE,
731    /// then there is an extra score penalty applied to the peer through P7.
732    pub fn graft_flood_threshold(&mut self, graft_flood_threshold: Duration) -> &mut Self {
733        self.config.graft_flood_threshold = graft_flood_threshold;
734        self
735    }
736
737    /// Minimum number of outbound peers in the mesh network before adding more (D_out in the spec).
738    /// This value must be smaller or equal than `mesh_n / 2` and smaller than `mesh_n_low`.
739    /// The default is 2.
740    pub fn mesh_outbound_min(&mut self, mesh_outbound_min: usize) -> &mut Self {
741        self.config.mesh_outbound_min = mesh_outbound_min;
742        self
743    }
744
745    /// Number of heartbeat ticks that specify the interval in which opportunistic grafting is
746    /// applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh
747    /// peers to replace lower-scoring ones, if the median score of our mesh peers falls below a
748    /// threshold (see <https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds>).
749    /// The default is 60.
750    pub fn opportunistic_graft_ticks(&mut self, opportunistic_graft_ticks: u64) -> &mut Self {
751        self.config.opportunistic_graft_ticks = opportunistic_graft_ticks;
752        self
753    }
754
755    /// Controls how many times we will allow a peer to request the same message id through IWANT
756    /// gossip before we start ignoring them. This is designed to prevent peers from spamming us
757    /// with requests and wasting our resources.
758    pub fn gossip_retransimission(&mut self, gossip_retransimission: u32) -> &mut Self {
759        self.config.gossip_retransimission = gossip_retransimission;
760        self
761    }
762
763    /// The maximum number of new peers to graft to during opportunistic grafting. The default is 2.
764    pub fn opportunistic_graft_peers(&mut self, opportunistic_graft_peers: usize) -> &mut Self {
765        self.config.opportunistic_graft_peers = opportunistic_graft_peers;
766        self
767    }
768
769    /// The maximum number of messages we will process in a given RPC. If this is unset, there is
770    /// no limit. The default is None.
771    pub fn max_messages_per_rpc(&mut self, max: Option<usize>) -> &mut Self {
772        self.config.max_messages_per_rpc = max;
773        self
774    }
775
776    /// The maximum number of messages to include in an IHAVE message.
777    /// Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a
778    /// peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the
779    /// default if your system is pushing more than 5000 messages in GossipSubHistoryGossip
780    /// heartbeats; with the defaults this is 1666 messages/s. The default is 5000.
781    pub fn max_ihave_length(&mut self, max_ihave_length: usize) -> &mut Self {
782        self.config.max_ihave_length = max_ihave_length;
783        self
784    }
785
786    /// GossipSubMaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer
787    /// within a heartbeat.
788    pub fn max_ihave_messages(&mut self, max_ihave_messages: usize) -> &mut Self {
789        self.config.max_ihave_messages = max_ihave_messages;
790        self
791    }
792
793    /// By default, gossipsub will reject messages that are sent to us that has the same message
794    /// source as we have specified locally. Enabling this, allows these messages and prevents
795    /// penalizing the peer that sent us the message. Default is false.
796    pub fn allow_self_origin(&mut self, allow_self_origin: bool) -> &mut Self {
797        self.config.allow_self_origin = allow_self_origin;
798        self
799    }
800
801    /// Time to wait for a message requested through IWANT following an IHAVE advertisement.
802    /// If the message is not received within this window, a broken promise is declared and
803    /// the router may apply behavioural penalties. The default is 3 seconds.
804    pub fn iwant_followup_time(&mut self, iwant_followup_time: Duration) -> &mut Self {
805        self.config.iwant_followup_time = iwant_followup_time;
806        self
807    }
808
809    /// Enable support for flooodsub peers.
810    pub fn support_floodsub(&mut self) -> &mut Self {
811        if self
812            .config
813            .protocol
814            .protocol_ids
815            .contains(&FLOODSUB_PROTOCOL)
816        {
817            return self;
818        }
819
820        self.config.protocol.protocol_ids.push(FLOODSUB_PROTOCOL);
821        self
822    }
823
824    /// Published message ids time cache duration. The default is 10 seconds.
825    pub fn published_message_ids_cache_time(
826        &mut self,
827        published_message_ids_cache_time: Duration,
828    ) -> &mut Self {
829        self.config.published_message_ids_cache_time = published_message_ids_cache_time;
830        self
831    }
832
833    /// The max number of messages a `ConnectionHandler` can buffer. The default is 5000.
834    pub fn connection_handler_queue_len(&mut self, len: usize) -> &mut Self {
835        self.config.connection_handler_queue_len = len;
836        self
837    }
838
839    /// The duration a message to be published can wait to be sent before it is abandoned. The
840    /// default is 5 seconds.
841    pub fn publish_queue_duration(&mut self, duration: Duration) -> &mut Self {
842        self.config.connection_handler_publish_duration = duration;
843        self
844    }
845
846    /// The duration a message to be forwarded can wait to be sent before it is abandoned. The
847    /// default is 1s.
848    pub fn forward_queue_duration(&mut self, duration: Duration) -> &mut Self {
849        self.config.connection_handler_forward_duration = duration;
850        self
851    }
852
853    /// The message size threshold for which IDONTWANT messages are sent.
854    /// Sending IDONTWANT messages for small messages can have a negative effect to the overall
855    /// traffic and CPU load. This acts as a lower bound cutoff for the message size to which
856    /// IDONTWANT won't be sent to peers. Only works if the peers support Gossipsub1.2
857    /// (see <https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message>)
858    /// default is 1kB
859    pub fn idontwant_message_size_threshold(&mut self, size: usize) -> &mut Self {
860        self.config.idontwant_message_size_threshold = size;
861        self
862    }
863
864    /// Send IDONTWANT messages after publishing message on gossip. This is an optimisation
865    /// to avoid bandwidth consumption by downloading the published message over gossip.
866    /// By default it is false.
867    pub fn idontwant_on_publish(&mut self, idontwant_on_publish: bool) -> &mut Self {
868        self.config.idontwant_on_publish = idontwant_on_publish;
869        self
870    }
871
872    /// Constructs a [`Config`] from the given configuration and validates the settings.
873    pub fn build(&self) -> Result<Config, ConfigBuilderError> {
874        // check all constraints on config
875
876        if self.config.protocol.max_transmit_size < 100 {
877            return Err(ConfigBuilderError::MaxTransmissionSizeTooSmall);
878        }
879
880        if self.config.history_length < self.config.history_gossip {
881            return Err(ConfigBuilderError::HistoryLengthTooSmall);
882        }
883
884        if !(self.config.mesh_outbound_min <= self.config.mesh_n_low
885            && self.config.mesh_n_low <= self.config.mesh_n
886            && self.config.mesh_n <= self.config.mesh_n_high)
887        {
888            return Err(ConfigBuilderError::MeshParametersInvalid);
889        }
890
891        if self.config.mesh_outbound_min * 2 > self.config.mesh_n {
892            return Err(ConfigBuilderError::MeshOutboundInvalid);
893        }
894
895        if self.config.unsubscribe_backoff.as_millis() == 0 {
896            return Err(ConfigBuilderError::UnsubscribeBackoffIsZero);
897        }
898
899        if self.invalid_protocol {
900            return Err(ConfigBuilderError::InvalidProtocol);
901        }
902
903        Ok(self.config.clone())
904    }
905}
906
907impl std::fmt::Debug for Config {
908    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
909        let mut builder = f.debug_struct("GossipsubConfig");
910        let _ = builder.field("protocol", &self.protocol);
911        let _ = builder.field("history_length", &self.history_length);
912        let _ = builder.field("history_gossip", &self.history_gossip);
913        let _ = builder.field("mesh_n", &self.mesh_n);
914        let _ = builder.field("mesh_n_low", &self.mesh_n_low);
915        let _ = builder.field("mesh_n_high", &self.mesh_n_high);
916        let _ = builder.field("retain_scores", &self.retain_scores);
917        let _ = builder.field("gossip_lazy", &self.gossip_lazy);
918        let _ = builder.field("gossip_factor", &self.gossip_factor);
919        let _ = builder.field("heartbeat_initial_delay", &self.heartbeat_initial_delay);
920        let _ = builder.field("heartbeat_interval", &self.heartbeat_interval);
921        let _ = builder.field("fanout_ttl", &self.fanout_ttl);
922        let _ = builder.field("duplicate_cache_time", &self.duplicate_cache_time);
923        let _ = builder.field("validate_messages", &self.validate_messages);
924        let _ = builder.field("allow_self_origin", &self.allow_self_origin);
925        let _ = builder.field("do_px", &self.do_px);
926        let _ = builder.field("prune_peers", &self.prune_peers);
927        let _ = builder.field("prune_backoff", &self.prune_backoff);
928        let _ = builder.field("backoff_slack", &self.backoff_slack);
929        let _ = builder.field("flood_publish", &self.flood_publish);
930        let _ = builder.field("graft_flood_threshold", &self.graft_flood_threshold);
931        let _ = builder.field("mesh_outbound_min", &self.mesh_outbound_min);
932        let _ = builder.field("opportunistic_graft_ticks", &self.opportunistic_graft_ticks);
933        let _ = builder.field("opportunistic_graft_peers", &self.opportunistic_graft_peers);
934        let _ = builder.field("max_messages_per_rpc", &self.max_messages_per_rpc);
935        let _ = builder.field("max_ihave_length", &self.max_ihave_length);
936        let _ = builder.field("max_ihave_messages", &self.max_ihave_messages);
937        let _ = builder.field("iwant_followup_time", &self.iwant_followup_time);
938        let _ = builder.field(
939            "published_message_ids_cache_time",
940            &self.published_message_ids_cache_time,
941        );
942        let _ = builder.field(
943            "idontwant_message_size_threhold",
944            &self.idontwant_message_size_threshold,
945        );
946        let _ = builder.field("idontwant_on_publish", &self.idontwant_on_publish);
947        builder.finish()
948    }
949}
950
951#[cfg(test)]
952mod test {
953    use std::{
954        collections::hash_map::DefaultHasher,
955        hash::{Hash, Hasher},
956    };
957
958    use libp2p_core::UpgradeInfo;
959
960    use super::*;
961    use crate::{topic::IdentityHash, Topic};
962
963    #[test]
964    fn create_config_with_message_id_as_plain_function() {
965        let config = ConfigBuilder::default()
966            .message_id_fn(message_id_plain_function)
967            .build()
968            .unwrap();
969
970        let result = config.message_id(&get_gossipsub_message());
971
972        assert_eq!(result, get_expected_message_id());
973    }
974
975    #[test]
976    fn create_config_with_message_id_as_closure() {
977        let config = ConfigBuilder::default()
978            .message_id_fn(|message: &Message| {
979                let mut s = DefaultHasher::new();
980                message.data.hash(&mut s);
981                let mut v = s.finish().to_string();
982                v.push('e');
983                MessageId::from(v)
984            })
985            .build()
986            .unwrap();
987
988        let result = config.message_id(&get_gossipsub_message());
989
990        assert_eq!(result, get_expected_message_id());
991    }
992
993    #[test]
994    fn create_config_with_message_id_as_closure_with_variable_capture() {
995        let captured: char = 'e';
996
997        let config = ConfigBuilder::default()
998            .message_id_fn(move |message: &Message| {
999                let mut s = DefaultHasher::new();
1000                message.data.hash(&mut s);
1001                let mut v = s.finish().to_string();
1002                v.push(captured);
1003                MessageId::from(v)
1004            })
1005            .build()
1006            .unwrap();
1007
1008        let result = config.message_id(&get_gossipsub_message());
1009
1010        assert_eq!(result, get_expected_message_id());
1011    }
1012
1013    #[test]
1014    fn create_config_with_protocol_id_prefix() {
1015        let protocol_config = ConfigBuilder::default()
1016            .protocol_id_prefix("/purple")
1017            .build()
1018            .unwrap()
1019            .protocol_config();
1020
1021        let protocol_ids = protocol_config.protocol_info();
1022
1023        assert_eq!(protocol_ids.len(), 2);
1024
1025        assert_eq!(
1026            protocol_ids[0].protocol,
1027            StreamProtocol::new("/purple/1.1.0")
1028        );
1029        assert_eq!(protocol_ids[0].kind, PeerKind::Gossipsubv1_1);
1030
1031        assert_eq!(
1032            protocol_ids[1].protocol,
1033            StreamProtocol::new("/purple/1.0.0")
1034        );
1035        assert_eq!(protocol_ids[1].kind, PeerKind::Gossipsub);
1036    }
1037
1038    #[test]
1039    fn create_config_with_custom_protocol_id() {
1040        let protocol_config = ConfigBuilder::default()
1041            .protocol_id("/purple", Version::V1_0)
1042            .build()
1043            .unwrap()
1044            .protocol_config();
1045
1046        let protocol_ids = protocol_config.protocol_info();
1047
1048        assert_eq!(protocol_ids.len(), 1);
1049
1050        assert_eq!(protocol_ids[0].protocol, "/purple");
1051        assert_eq!(protocol_ids[0].kind, PeerKind::Gossipsub);
1052    }
1053
1054    fn get_gossipsub_message() -> Message {
1055        Message {
1056            source: None,
1057            data: vec![12, 34, 56],
1058            sequence_number: None,
1059            topic: Topic::<IdentityHash>::new("test").hash(),
1060        }
1061    }
1062
1063    fn get_expected_message_id() -> MessageId {
1064        MessageId::from([
1065            49, 55, 56, 51, 56, 52, 49, 51, 52, 51, 52, 55, 51, 51, 53, 52, 54, 54, 52, 49, 101,
1066        ])
1067    }
1068
1069    fn message_id_plain_function(message: &Message) -> MessageId {
1070        let mut s = DefaultHasher::new();
1071        message.data.hash(&mut s);
1072        let mut v = s.finish().to_string();
1073        v.push('e');
1074        MessageId::from(v)
1075    }
1076}