libp2p_gossipsub/
types.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! A collection of types using the Gossipsub system.
22use std::{collections::BTreeSet, fmt, fmt::Debug};
23
24use futures_timer::Delay;
25use hashlink::LinkedHashMap;
26use libp2p_identity::PeerId;
27use libp2p_swarm::ConnectionId;
28use prometheus_client::encoding::EncodeLabelValue;
29use quick_protobuf::MessageWrite;
30#[cfg(feature = "serde")]
31use serde::{Deserialize, Serialize};
32use web_time::Instant;
33
34use crate::{rpc::Sender, rpc_proto::proto, TopicHash};
35
36/// Messages that have expired while attempting to be sent to a peer.
37#[derive(Clone, Debug, Default)]
38pub struct FailedMessages {
39    /// The number of publish messages that failed to be published in a heartbeat.
40    pub publish: usize,
41    /// The number of forward messages that failed to be published in a heartbeat.
42    pub forward: usize,
43    /// The number of messages that were failed to be sent to the priority queue as it was full.
44    pub priority: usize,
45    /// The number of messages that were failed to be sent to the non-priority queue as it was
46    /// full.
47    pub non_priority: usize,
48    /// The number of messages that timed out and could not be sent.
49    pub timeout: usize,
50}
51
52impl FailedMessages {
53    /// The total number of messages that failed due to the queue being full.
54    pub fn total_queue_full(&self) -> usize {
55        self.priority + self.non_priority
56    }
57
58    /// The total failed messages in a heartbeat.
59    pub fn total(&self) -> usize {
60        self.priority + self.non_priority
61    }
62}
63
64#[derive(Debug)]
65/// Validation kinds from the application for received messages.
66pub enum MessageAcceptance {
67    /// The message is considered valid, and it should be delivered and forwarded to the network.
68    Accept,
69    /// The message is considered invalid, and it should be rejected and trigger the P₄ penalty.
70    Reject,
71    /// The message is neither delivered nor forwarded to the network, but the router does not
72    /// trigger the P₄ penalty.
73    Ignore,
74}
75
76#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
77#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
78pub struct MessageId(pub Vec<u8>);
79
80impl MessageId {
81    pub fn new(value: &[u8]) -> Self {
82        Self(value.to_vec())
83    }
84}
85
86impl<T: Into<Vec<u8>>> From<T> for MessageId {
87    fn from(value: T) -> Self {
88        Self(value.into())
89    }
90}
91
92impl std::fmt::Display for MessageId {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        write!(f, "{}", hex_fmt::HexFmt(&self.0))
95    }
96}
97
98impl std::fmt::Debug for MessageId {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        write!(f, "MessageId({})", hex_fmt::HexFmt(&self.0))
101    }
102}
103
104#[derive(Debug)]
105pub(crate) struct PeerConnections {
106    /// The kind of protocol the peer supports.
107    pub(crate) kind: PeerKind,
108    /// Its current connections.
109    pub(crate) connections: Vec<ConnectionId>,
110    /// Subscribed topics.
111    pub(crate) topics: BTreeSet<TopicHash>,
112    /// The rpc sender to the connection handler(s).
113    pub(crate) sender: Sender,
114    /// Don't send messages.
115    pub(crate) dont_send: LinkedHashMap<MessageId, Instant>,
116}
117
118/// Describes the types of peers that can exist in the gossipsub context.
119#[derive(Debug, Clone, Copy, PartialEq, Hash, EncodeLabelValue, Eq)]
120pub enum PeerKind {
121    /// A gossipsub 1.2 peer.
122    Gossipsubv1_2,
123    /// A gossipsub 1.1 peer.
124    Gossipsubv1_1,
125    /// A gossipsub 1.0 peer.
126    Gossipsub,
127    /// A floodsub peer.
128    Floodsub,
129    /// The peer doesn't support any of the protocols.
130    NotSupported,
131}
132
133/// A message received by the gossipsub system and stored locally in caches..
134#[derive(Clone, PartialEq, Eq, Hash, Debug)]
135pub struct RawMessage {
136    /// Id of the peer that published this message.
137    pub source: Option<PeerId>,
138
139    /// Content of the message. Its meaning is out of scope of this library.
140    pub data: Vec<u8>,
141
142    /// A random sequence number.
143    pub sequence_number: Option<u64>,
144
145    /// The topic this message belongs to
146    pub topic: TopicHash,
147
148    /// The signature of the message if it's signed.
149    pub signature: Option<Vec<u8>>,
150
151    /// The public key of the message if it is signed and the source [`PeerId`] cannot be inlined.
152    pub key: Option<Vec<u8>>,
153
154    /// Flag indicating if this message has been validated by the application or not.
155    pub validated: bool,
156}
157
158impl PeerKind {
159    /// Returns true if peer speaks any gossipsub version.
160    pub(crate) fn is_gossipsub(&self) -> bool {
161        matches!(
162            self,
163            Self::Gossipsubv1_2 | Self::Gossipsubv1_1 | Self::Gossipsub
164        )
165    }
166}
167
168impl RawMessage {
169    /// Calculates the encoded length of this message (used for calculating metrics).
170    pub fn raw_protobuf_len(&self) -> usize {
171        let message = proto::Message {
172            from: self.source.map(|m| m.to_bytes()),
173            data: Some(self.data.clone()),
174            seqno: self.sequence_number.map(|s| s.to_be_bytes().to_vec()),
175            topic: TopicHash::into_string(self.topic.clone()),
176            signature: self.signature.clone(),
177            key: self.key.clone(),
178        };
179        message.get_size()
180    }
181}
182
183impl From<RawMessage> for proto::Message {
184    fn from(raw: RawMessage) -> Self {
185        proto::Message {
186            from: raw.source.map(|m| m.to_bytes()),
187            data: Some(raw.data),
188            seqno: raw.sequence_number.map(|s| s.to_be_bytes().to_vec()),
189            topic: TopicHash::into_string(raw.topic),
190            signature: raw.signature,
191            key: raw.key,
192        }
193    }
194}
195
196/// The message sent to the user after a [`RawMessage`] has been transformed by a
197/// [`crate::DataTransform`].
198#[derive(Clone, PartialEq, Eq, Hash)]
199pub struct Message {
200    /// Id of the peer that published this message.
201    pub source: Option<PeerId>,
202
203    /// Content of the message.
204    pub data: Vec<u8>,
205
206    /// A random sequence number.
207    pub sequence_number: Option<u64>,
208
209    /// The topic this message belongs to
210    pub topic: TopicHash,
211}
212
213impl fmt::Debug for Message {
214    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215        f.debug_struct("Message")
216            .field(
217                "data",
218                &format_args!("{:<20}", &hex_fmt::HexFmt(&self.data)),
219            )
220            .field("source", &self.source)
221            .field("sequence_number", &self.sequence_number)
222            .field("topic", &self.topic)
223            .finish()
224    }
225}
226
227/// A subscription received by the gossipsub system.
228#[derive(Debug, Clone, PartialEq, Eq, Hash)]
229pub struct Subscription {
230    /// Action to perform.
231    pub action: SubscriptionAction,
232    /// The topic from which to subscribe or unsubscribe.
233    pub topic_hash: TopicHash,
234}
235
236/// Action that a subscription wants to perform.
237#[derive(Debug, Clone, PartialEq, Eq, Hash)]
238pub enum SubscriptionAction {
239    /// The remote wants to subscribe to the given topic.
240    Subscribe,
241    /// The remote wants to unsubscribe from the given topic.
242    Unsubscribe,
243}
244
245#[derive(Debug, Clone, PartialEq, Eq, Hash)]
246pub(crate) struct PeerInfo {
247    pub(crate) peer_id: Option<PeerId>,
248    // TODO add this when RFC: Signed Address Records got added to the spec (see pull request
249    // https://github.com/libp2p/specs/pull/217)
250    // pub signed_peer_record: ?,
251}
252
253/// A Control message received by the gossipsub system.
254#[derive(Debug, Clone, PartialEq, Eq, Hash)]
255pub enum ControlAction {
256    /// Node broadcasts known messages per topic - IHave control message.
257    IHave(IHave),
258    /// The node requests specific message ids (peer_id + sequence _number) - IWant control
259    /// message.
260    IWant(IWant),
261    /// The node has been added to the mesh - Graft control message.
262    Graft(Graft),
263    /// The node has been removed from the mesh - Prune control message.
264    Prune(Prune),
265    /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant
266    /// control message.
267    IDontWant(IDontWant),
268}
269
270/// Node broadcasts known messages per topic - IHave control message.
271#[derive(Debug, Clone, PartialEq, Eq, Hash)]
272pub struct IHave {
273    /// The topic of the messages.
274    pub(crate) topic_hash: TopicHash,
275    /// A list of known message ids (peer_id + sequence _number) as a string.
276    pub(crate) message_ids: Vec<MessageId>,
277}
278
279/// The node requests specific message ids (peer_id + sequence _number) - IWant control message.
280#[derive(Debug, Clone, PartialEq, Eq, Hash)]
281pub struct IWant {
282    /// A list of known message ids (peer_id + sequence _number) as a string.
283    pub(crate) message_ids: Vec<MessageId>,
284}
285
286/// The node has been added to the mesh - Graft control message.
287#[derive(Debug, Clone, PartialEq, Eq, Hash)]
288pub struct Graft {
289    /// The mesh topic the peer should be added to.
290    pub(crate) topic_hash: TopicHash,
291}
292
293/// The node has been removed from the mesh - Prune control message.
294#[derive(Debug, Clone, PartialEq, Eq, Hash)]
295pub struct Prune {
296    /// The mesh topic the peer should be removed from.
297    pub(crate) topic_hash: TopicHash,
298    /// A list of peers to be proposed to the removed peer as peer exchange
299    pub(crate) peers: Vec<PeerInfo>,
300    /// The backoff time in seconds before we allow to reconnect
301    pub(crate) backoff: Option<u64>,
302}
303
304/// The node requests us to not forward message ids - IDontWant control message.
305#[derive(Debug, Clone, PartialEq, Eq, Hash)]
306pub struct IDontWant {
307    /// A list of known message ids.
308    pub(crate) message_ids: Vec<MessageId>,
309}
310
311/// A Gossipsub RPC message sent.
312#[derive(Debug)]
313pub enum RpcOut {
314    /// Publish a Gossipsub message on network.`timeout` limits the duration the message
315    /// can wait to be sent before it is abandoned.
316    Publish { message: RawMessage, timeout: Delay },
317    /// Forward a Gossipsub message on network. `timeout` limits the duration the message
318    /// can wait to be sent before it is abandoned.
319    Forward { message: RawMessage, timeout: Delay },
320    /// Subscribe a topic.
321    Subscribe(TopicHash),
322    /// Unsubscribe a topic.
323    Unsubscribe(TopicHash),
324    /// Send a GRAFT control message.
325    Graft(Graft),
326    /// Send a PRUNE control message.
327    Prune(Prune),
328    /// Send a IHave control message.
329    IHave(IHave),
330    /// Send a IWant control message.
331    IWant(IWant),
332    /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant
333    /// control message.
334    IDontWant(IDontWant),
335}
336
337impl RpcOut {
338    /// Converts the GossipsubRPC into its protobuf format.
339    // A convenience function to avoid explicitly specifying types.
340    pub fn into_protobuf(self) -> proto::RPC {
341        self.into()
342    }
343}
344
345impl From<RpcOut> for proto::RPC {
346    /// Converts the RPC into protobuf format.
347    fn from(rpc: RpcOut) -> Self {
348        match rpc {
349            RpcOut::Publish {
350                message,
351                timeout: _,
352            } => proto::RPC {
353                subscriptions: Vec::new(),
354                publish: vec![message.into()],
355                control: None,
356            },
357            RpcOut::Forward {
358                message,
359                timeout: _,
360            } => proto::RPC {
361                publish: vec![message.into()],
362                subscriptions: Vec::new(),
363                control: None,
364            },
365            RpcOut::Subscribe(topic) => proto::RPC {
366                publish: Vec::new(),
367                subscriptions: vec![proto::SubOpts {
368                    subscribe: Some(true),
369                    topic_id: Some(topic.into_string()),
370                }],
371                control: None,
372            },
373            RpcOut::Unsubscribe(topic) => proto::RPC {
374                publish: Vec::new(),
375                subscriptions: vec![proto::SubOpts {
376                    subscribe: Some(false),
377                    topic_id: Some(topic.into_string()),
378                }],
379                control: None,
380            },
381            RpcOut::IHave(IHave {
382                topic_hash,
383                message_ids,
384            }) => proto::RPC {
385                publish: Vec::new(),
386                subscriptions: Vec::new(),
387                control: Some(proto::ControlMessage {
388                    ihave: vec![proto::ControlIHave {
389                        topic_id: Some(topic_hash.into_string()),
390                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
391                    }],
392                    iwant: vec![],
393                    graft: vec![],
394                    prune: vec![],
395                    idontwant: vec![],
396                }),
397            },
398            RpcOut::IWant(IWant { message_ids }) => proto::RPC {
399                publish: Vec::new(),
400                subscriptions: Vec::new(),
401                control: Some(proto::ControlMessage {
402                    ihave: vec![],
403                    iwant: vec![proto::ControlIWant {
404                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
405                    }],
406                    graft: vec![],
407                    prune: vec![],
408                    idontwant: vec![],
409                }),
410            },
411            RpcOut::Graft(Graft { topic_hash }) => proto::RPC {
412                publish: Vec::new(),
413                subscriptions: vec![],
414                control: Some(proto::ControlMessage {
415                    ihave: vec![],
416                    iwant: vec![],
417                    graft: vec![proto::ControlGraft {
418                        topic_id: Some(topic_hash.into_string()),
419                    }],
420                    prune: vec![],
421                    idontwant: vec![],
422                }),
423            },
424            RpcOut::Prune(Prune {
425                topic_hash,
426                peers,
427                backoff,
428            }) => {
429                proto::RPC {
430                    publish: Vec::new(),
431                    subscriptions: vec![],
432                    control: Some(proto::ControlMessage {
433                        ihave: vec![],
434                        iwant: vec![],
435                        graft: vec![],
436                        prune: vec![proto::ControlPrune {
437                            topic_id: Some(topic_hash.into_string()),
438                            peers: peers
439                                .into_iter()
440                                .map(|info| proto::PeerInfo {
441                                    peer_id: info.peer_id.map(|id| id.to_bytes()),
442                                    // TODO, see https://github.com/libp2p/specs/pull/217
443                                    signed_peer_record: None,
444                                })
445                                .collect(),
446                            backoff,
447                        }],
448                        idontwant: vec![],
449                    }),
450                }
451            }
452            RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC {
453                publish: Vec::new(),
454                subscriptions: Vec::new(),
455                control: Some(proto::ControlMessage {
456                    ihave: vec![],
457                    iwant: vec![],
458                    graft: vec![],
459                    prune: vec![],
460                    idontwant: vec![proto::ControlIDontWant {
461                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
462                    }],
463                }),
464            },
465        }
466    }
467}
468
469/// An RPC received/sent.
470#[derive(Clone, PartialEq, Eq, Hash)]
471pub struct Rpc {
472    /// List of messages that were part of this RPC query.
473    pub messages: Vec<RawMessage>,
474    /// List of subscriptions.
475    pub subscriptions: Vec<Subscription>,
476    /// List of Gossipsub control messages.
477    pub control_msgs: Vec<ControlAction>,
478}
479
480impl Rpc {
481    /// Converts the GossipsubRPC into its protobuf format.
482    // A convenience function to avoid explicitly specifying types.
483    pub fn into_protobuf(self) -> proto::RPC {
484        self.into()
485    }
486}
487
488impl From<Rpc> for proto::RPC {
489    /// Converts the RPC into protobuf format.
490    fn from(rpc: Rpc) -> Self {
491        // Messages
492        let mut publish = Vec::new();
493
494        for message in rpc.messages.into_iter() {
495            let message = proto::Message {
496                from: message.source.map(|m| m.to_bytes()),
497                data: Some(message.data),
498                seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()),
499                topic: TopicHash::into_string(message.topic),
500                signature: message.signature,
501                key: message.key,
502            };
503
504            publish.push(message);
505        }
506
507        // subscriptions
508        let subscriptions = rpc
509            .subscriptions
510            .into_iter()
511            .map(|sub| proto::SubOpts {
512                subscribe: Some(sub.action == SubscriptionAction::Subscribe),
513                topic_id: Some(sub.topic_hash.into_string()),
514            })
515            .collect::<Vec<_>>();
516
517        // control messages
518        let mut control = proto::ControlMessage {
519            ihave: Vec::new(),
520            iwant: Vec::new(),
521            graft: Vec::new(),
522            prune: Vec::new(),
523            idontwant: Vec::new(),
524        };
525
526        let empty_control_msg = rpc.control_msgs.is_empty();
527
528        for action in rpc.control_msgs {
529            match action {
530                // collect all ihave messages
531                ControlAction::IHave(IHave {
532                    topic_hash,
533                    message_ids,
534                }) => {
535                    let rpc_ihave = proto::ControlIHave {
536                        topic_id: Some(topic_hash.into_string()),
537                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
538                    };
539                    control.ihave.push(rpc_ihave);
540                }
541                ControlAction::IWant(IWant { message_ids }) => {
542                    let rpc_iwant = proto::ControlIWant {
543                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
544                    };
545                    control.iwant.push(rpc_iwant);
546                }
547                ControlAction::Graft(Graft { topic_hash }) => {
548                    let rpc_graft = proto::ControlGraft {
549                        topic_id: Some(topic_hash.into_string()),
550                    };
551                    control.graft.push(rpc_graft);
552                }
553                ControlAction::Prune(Prune {
554                    topic_hash,
555                    peers,
556                    backoff,
557                }) => {
558                    let rpc_prune = proto::ControlPrune {
559                        topic_id: Some(topic_hash.into_string()),
560                        peers: peers
561                            .into_iter()
562                            .map(|info| proto::PeerInfo {
563                                peer_id: info.peer_id.map(|id| id.to_bytes()),
564                                // TODO, see https://github.com/libp2p/specs/pull/217
565                                signed_peer_record: None,
566                            })
567                            .collect(),
568                        backoff,
569                    };
570                    control.prune.push(rpc_prune);
571                }
572                ControlAction::IDontWant(IDontWant { message_ids }) => {
573                    let rpc_idontwant = proto::ControlIDontWant {
574                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
575                    };
576                    control.idontwant.push(rpc_idontwant);
577                }
578            }
579        }
580
581        proto::RPC {
582            subscriptions,
583            publish,
584            control: if empty_control_msg {
585                None
586            } else {
587                Some(control)
588            },
589        }
590    }
591}
592
593impl fmt::Debug for Rpc {
594    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
595        let mut b = f.debug_struct("GossipsubRpc");
596        if !self.messages.is_empty() {
597            b.field("messages", &self.messages);
598        }
599        if !self.subscriptions.is_empty() {
600            b.field("subscriptions", &self.subscriptions);
601        }
602        if !self.control_msgs.is_empty() {
603            b.field("control_msgs", &self.control_msgs);
604        }
605        b.finish()
606    }
607}
608
609impl PeerKind {
610    pub fn as_static_ref(&self) -> &'static str {
611        match self {
612            Self::NotSupported => "Not Supported",
613            Self::Floodsub => "Floodsub",
614            Self::Gossipsub => "Gossipsub v1.0",
615            Self::Gossipsubv1_1 => "Gossipsub v1.1",
616            Self::Gossipsubv1_2 => "Gossipsub v1.2",
617        }
618    }
619}
620
621impl AsRef<str> for PeerKind {
622    fn as_ref(&self) -> &str {
623        self.as_static_ref()
624    }
625}
626
627impl fmt::Display for PeerKind {
628    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
629        f.write_str(self.as_ref())
630    }
631}