1use std::{collections::BTreeSet, fmt, fmt::Debug};
23
24use futures_timer::Delay;
25use hashlink::LinkedHashMap;
26use libp2p_identity::PeerId;
27use libp2p_swarm::ConnectionId;
28use prometheus_client::encoding::EncodeLabelValue;
29use quick_protobuf::MessageWrite;
30#[cfg(feature = "serde")]
31use serde::{Deserialize, Serialize};
32use web_time::Instant;
33
34use crate::{rpc::Sender, rpc_proto::proto, TopicHash};
35
36#[derive(Clone, Debug, Default)]
38pub struct FailedMessages {
39 pub publish: usize,
41 pub forward: usize,
43 pub priority: usize,
45 pub non_priority: usize,
48 pub timeout: usize,
50}
51
52impl FailedMessages {
53 pub fn total_queue_full(&self) -> usize {
55 self.priority + self.non_priority
56 }
57
58 pub fn total(&self) -> usize {
60 self.priority + self.non_priority
61 }
62}
63
64#[derive(Debug)]
65pub enum MessageAcceptance {
67 Accept,
69 Reject,
71 Ignore,
74}
75
76#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
77#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
78pub struct MessageId(pub Vec<u8>);
79
80impl MessageId {
81 pub fn new(value: &[u8]) -> Self {
82 Self(value.to_vec())
83 }
84}
85
86impl<T: Into<Vec<u8>>> From<T> for MessageId {
87 fn from(value: T) -> Self {
88 Self(value.into())
89 }
90}
91
92impl std::fmt::Display for MessageId {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 write!(f, "{}", hex_fmt::HexFmt(&self.0))
95 }
96}
97
98impl std::fmt::Debug for MessageId {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 write!(f, "MessageId({})", hex_fmt::HexFmt(&self.0))
101 }
102}
103
104#[derive(Debug)]
105pub(crate) struct PeerConnections {
106 pub(crate) kind: PeerKind,
108 pub(crate) connections: Vec<ConnectionId>,
110 pub(crate) topics: BTreeSet<TopicHash>,
112 pub(crate) sender: Sender,
114 pub(crate) dont_send: LinkedHashMap<MessageId, Instant>,
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Hash, EncodeLabelValue, Eq)]
120pub enum PeerKind {
121 Gossipsubv1_2,
123 Gossipsubv1_1,
125 Gossipsub,
127 Floodsub,
129 NotSupported,
131}
132
133#[derive(Clone, PartialEq, Eq, Hash, Debug)]
135pub struct RawMessage {
136 pub source: Option<PeerId>,
138
139 pub data: Vec<u8>,
141
142 pub sequence_number: Option<u64>,
144
145 pub topic: TopicHash,
147
148 pub signature: Option<Vec<u8>>,
150
151 pub key: Option<Vec<u8>>,
153
154 pub validated: bool,
156}
157
158impl PeerKind {
159 pub(crate) fn is_gossipsub(&self) -> bool {
161 matches!(
162 self,
163 Self::Gossipsubv1_2 | Self::Gossipsubv1_1 | Self::Gossipsub
164 )
165 }
166}
167
168impl RawMessage {
169 pub fn raw_protobuf_len(&self) -> usize {
171 let message = proto::Message {
172 from: self.source.map(|m| m.to_bytes()),
173 data: Some(self.data.clone()),
174 seqno: self.sequence_number.map(|s| s.to_be_bytes().to_vec()),
175 topic: TopicHash::into_string(self.topic.clone()),
176 signature: self.signature.clone(),
177 key: self.key.clone(),
178 };
179 message.get_size()
180 }
181}
182
183impl From<RawMessage> for proto::Message {
184 fn from(raw: RawMessage) -> Self {
185 proto::Message {
186 from: raw.source.map(|m| m.to_bytes()),
187 data: Some(raw.data),
188 seqno: raw.sequence_number.map(|s| s.to_be_bytes().to_vec()),
189 topic: TopicHash::into_string(raw.topic),
190 signature: raw.signature,
191 key: raw.key,
192 }
193 }
194}
195
196#[derive(Clone, PartialEq, Eq, Hash)]
199pub struct Message {
200 pub source: Option<PeerId>,
202
203 pub data: Vec<u8>,
205
206 pub sequence_number: Option<u64>,
208
209 pub topic: TopicHash,
211}
212
213impl fmt::Debug for Message {
214 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215 f.debug_struct("Message")
216 .field(
217 "data",
218 &format_args!("{:<20}", &hex_fmt::HexFmt(&self.data)),
219 )
220 .field("source", &self.source)
221 .field("sequence_number", &self.sequence_number)
222 .field("topic", &self.topic)
223 .finish()
224 }
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Hash)]
229pub struct Subscription {
230 pub action: SubscriptionAction,
232 pub topic_hash: TopicHash,
234}
235
236#[derive(Debug, Clone, PartialEq, Eq, Hash)]
238pub enum SubscriptionAction {
239 Subscribe,
241 Unsubscribe,
243}
244
245#[derive(Debug, Clone, PartialEq, Eq, Hash)]
246pub(crate) struct PeerInfo {
247 pub(crate) peer_id: Option<PeerId>,
248 }
252
253#[derive(Debug, Clone, PartialEq, Eq, Hash)]
255pub enum ControlAction {
256 IHave(IHave),
258 IWant(IWant),
261 Graft(Graft),
263 Prune(Prune),
265 IDontWant(IDontWant),
268}
269
270#[derive(Debug, Clone, PartialEq, Eq, Hash)]
272pub struct IHave {
273 pub(crate) topic_hash: TopicHash,
275 pub(crate) message_ids: Vec<MessageId>,
277}
278
279#[derive(Debug, Clone, PartialEq, Eq, Hash)]
281pub struct IWant {
282 pub(crate) message_ids: Vec<MessageId>,
284}
285
286#[derive(Debug, Clone, PartialEq, Eq, Hash)]
288pub struct Graft {
289 pub(crate) topic_hash: TopicHash,
291}
292
293#[derive(Debug, Clone, PartialEq, Eq, Hash)]
295pub struct Prune {
296 pub(crate) topic_hash: TopicHash,
298 pub(crate) peers: Vec<PeerInfo>,
300 pub(crate) backoff: Option<u64>,
302}
303
304#[derive(Debug, Clone, PartialEq, Eq, Hash)]
306pub struct IDontWant {
307 pub(crate) message_ids: Vec<MessageId>,
309}
310
311#[derive(Debug)]
313pub enum RpcOut {
314 Publish { message: RawMessage, timeout: Delay },
317 Forward { message: RawMessage, timeout: Delay },
320 Subscribe(TopicHash),
322 Unsubscribe(TopicHash),
324 Graft(Graft),
326 Prune(Prune),
328 IHave(IHave),
330 IWant(IWant),
332 IDontWant(IDontWant),
335}
336
337impl RpcOut {
338 pub fn into_protobuf(self) -> proto::RPC {
341 self.into()
342 }
343}
344
345impl From<RpcOut> for proto::RPC {
346 fn from(rpc: RpcOut) -> Self {
348 match rpc {
349 RpcOut::Publish {
350 message,
351 timeout: _,
352 } => proto::RPC {
353 subscriptions: Vec::new(),
354 publish: vec![message.into()],
355 control: None,
356 },
357 RpcOut::Forward {
358 message,
359 timeout: _,
360 } => proto::RPC {
361 publish: vec![message.into()],
362 subscriptions: Vec::new(),
363 control: None,
364 },
365 RpcOut::Subscribe(topic) => proto::RPC {
366 publish: Vec::new(),
367 subscriptions: vec![proto::SubOpts {
368 subscribe: Some(true),
369 topic_id: Some(topic.into_string()),
370 }],
371 control: None,
372 },
373 RpcOut::Unsubscribe(topic) => proto::RPC {
374 publish: Vec::new(),
375 subscriptions: vec![proto::SubOpts {
376 subscribe: Some(false),
377 topic_id: Some(topic.into_string()),
378 }],
379 control: None,
380 },
381 RpcOut::IHave(IHave {
382 topic_hash,
383 message_ids,
384 }) => proto::RPC {
385 publish: Vec::new(),
386 subscriptions: Vec::new(),
387 control: Some(proto::ControlMessage {
388 ihave: vec![proto::ControlIHave {
389 topic_id: Some(topic_hash.into_string()),
390 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
391 }],
392 iwant: vec![],
393 graft: vec![],
394 prune: vec![],
395 idontwant: vec![],
396 }),
397 },
398 RpcOut::IWant(IWant { message_ids }) => proto::RPC {
399 publish: Vec::new(),
400 subscriptions: Vec::new(),
401 control: Some(proto::ControlMessage {
402 ihave: vec![],
403 iwant: vec![proto::ControlIWant {
404 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
405 }],
406 graft: vec![],
407 prune: vec![],
408 idontwant: vec![],
409 }),
410 },
411 RpcOut::Graft(Graft { topic_hash }) => proto::RPC {
412 publish: Vec::new(),
413 subscriptions: vec![],
414 control: Some(proto::ControlMessage {
415 ihave: vec![],
416 iwant: vec![],
417 graft: vec![proto::ControlGraft {
418 topic_id: Some(topic_hash.into_string()),
419 }],
420 prune: vec![],
421 idontwant: vec![],
422 }),
423 },
424 RpcOut::Prune(Prune {
425 topic_hash,
426 peers,
427 backoff,
428 }) => {
429 proto::RPC {
430 publish: Vec::new(),
431 subscriptions: vec![],
432 control: Some(proto::ControlMessage {
433 ihave: vec![],
434 iwant: vec![],
435 graft: vec![],
436 prune: vec![proto::ControlPrune {
437 topic_id: Some(topic_hash.into_string()),
438 peers: peers
439 .into_iter()
440 .map(|info| proto::PeerInfo {
441 peer_id: info.peer_id.map(|id| id.to_bytes()),
442 signed_peer_record: None,
444 })
445 .collect(),
446 backoff,
447 }],
448 idontwant: vec![],
449 }),
450 }
451 }
452 RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC {
453 publish: Vec::new(),
454 subscriptions: Vec::new(),
455 control: Some(proto::ControlMessage {
456 ihave: vec![],
457 iwant: vec![],
458 graft: vec![],
459 prune: vec![],
460 idontwant: vec![proto::ControlIDontWant {
461 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
462 }],
463 }),
464 },
465 }
466 }
467}
468
469#[derive(Clone, PartialEq, Eq, Hash)]
471pub struct Rpc {
472 pub messages: Vec<RawMessage>,
474 pub subscriptions: Vec<Subscription>,
476 pub control_msgs: Vec<ControlAction>,
478}
479
480impl Rpc {
481 pub fn into_protobuf(self) -> proto::RPC {
484 self.into()
485 }
486}
487
488impl From<Rpc> for proto::RPC {
489 fn from(rpc: Rpc) -> Self {
491 let mut publish = Vec::new();
493
494 for message in rpc.messages.into_iter() {
495 let message = proto::Message {
496 from: message.source.map(|m| m.to_bytes()),
497 data: Some(message.data),
498 seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()),
499 topic: TopicHash::into_string(message.topic),
500 signature: message.signature,
501 key: message.key,
502 };
503
504 publish.push(message);
505 }
506
507 let subscriptions = rpc
509 .subscriptions
510 .into_iter()
511 .map(|sub| proto::SubOpts {
512 subscribe: Some(sub.action == SubscriptionAction::Subscribe),
513 topic_id: Some(sub.topic_hash.into_string()),
514 })
515 .collect::<Vec<_>>();
516
517 let mut control = proto::ControlMessage {
519 ihave: Vec::new(),
520 iwant: Vec::new(),
521 graft: Vec::new(),
522 prune: Vec::new(),
523 idontwant: Vec::new(),
524 };
525
526 let empty_control_msg = rpc.control_msgs.is_empty();
527
528 for action in rpc.control_msgs {
529 match action {
530 ControlAction::IHave(IHave {
532 topic_hash,
533 message_ids,
534 }) => {
535 let rpc_ihave = proto::ControlIHave {
536 topic_id: Some(topic_hash.into_string()),
537 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
538 };
539 control.ihave.push(rpc_ihave);
540 }
541 ControlAction::IWant(IWant { message_ids }) => {
542 let rpc_iwant = proto::ControlIWant {
543 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
544 };
545 control.iwant.push(rpc_iwant);
546 }
547 ControlAction::Graft(Graft { topic_hash }) => {
548 let rpc_graft = proto::ControlGraft {
549 topic_id: Some(topic_hash.into_string()),
550 };
551 control.graft.push(rpc_graft);
552 }
553 ControlAction::Prune(Prune {
554 topic_hash,
555 peers,
556 backoff,
557 }) => {
558 let rpc_prune = proto::ControlPrune {
559 topic_id: Some(topic_hash.into_string()),
560 peers: peers
561 .into_iter()
562 .map(|info| proto::PeerInfo {
563 peer_id: info.peer_id.map(|id| id.to_bytes()),
564 signed_peer_record: None,
566 })
567 .collect(),
568 backoff,
569 };
570 control.prune.push(rpc_prune);
571 }
572 ControlAction::IDontWant(IDontWant { message_ids }) => {
573 let rpc_idontwant = proto::ControlIDontWant {
574 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
575 };
576 control.idontwant.push(rpc_idontwant);
577 }
578 }
579 }
580
581 proto::RPC {
582 subscriptions,
583 publish,
584 control: if empty_control_msg {
585 None
586 } else {
587 Some(control)
588 },
589 }
590 }
591}
592
593impl fmt::Debug for Rpc {
594 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
595 let mut b = f.debug_struct("GossipsubRpc");
596 if !self.messages.is_empty() {
597 b.field("messages", &self.messages);
598 }
599 if !self.subscriptions.is_empty() {
600 b.field("subscriptions", &self.subscriptions);
601 }
602 if !self.control_msgs.is_empty() {
603 b.field("control_msgs", &self.control_msgs);
604 }
605 b.finish()
606 }
607}
608
609impl PeerKind {
610 pub fn as_static_ref(&self) -> &'static str {
611 match self {
612 Self::NotSupported => "Not Supported",
613 Self::Floodsub => "Floodsub",
614 Self::Gossipsub => "Gossipsub v1.0",
615 Self::Gossipsubv1_1 => "Gossipsub v1.1",
616 Self::Gossipsubv1_2 => "Gossipsub v1.2",
617 }
618 }
619}
620
621impl AsRef<str> for PeerKind {
622 fn as_ref(&self) -> &str {
623 self.as_static_ref()
624 }
625}
626
627impl fmt::Display for PeerKind {
628 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
629 f.write_str(self.as_ref())
630 }
631}