1use std::{
22 collections::{
23 hash_map::{DefaultHasher, HashMap},
24 VecDeque,
25 },
26 iter,
27 task::{Context, Poll},
28};
29
30use bytes::Bytes;
31use cuckoofilter::{CuckooError, CuckooFilter};
32use fnv::FnvHashSet;
33use libp2p_core::{transport::PortUse, Endpoint, Multiaddr};
34use libp2p_identity::PeerId;
35use libp2p_swarm::{
36 behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm},
37 dial_opts::DialOpts,
38 CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler,
39 OneShotHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
40};
41use smallvec::SmallVec;
42
43use crate::{
44 protocol::{
45 FloodsubMessage, FloodsubProtocol, FloodsubRpc, FloodsubSubscription,
46 FloodsubSubscriptionAction,
47 },
48 topic::Topic,
49 FloodsubConfig,
50};
51
52pub struct Floodsub {
54 events: VecDeque<ToSwarm<FloodsubEvent, FloodsubRpc>>,
56
57 config: FloodsubConfig,
58
59 target_peers: FnvHashSet<PeerId>,
61
62 connected_peers: HashMap<PeerId, SmallVec<[Topic; 8]>>,
66
67 subscribed_topics: SmallVec<[Topic; 16]>,
70
71 received: CuckooFilter<DefaultHasher>,
74}
75
76impl Floodsub {
77 pub fn new(local_peer_id: PeerId) -> Self {
79 Self::from_config(FloodsubConfig::new(local_peer_id))
80 }
81
82 pub fn from_config(config: FloodsubConfig) -> Self {
84 Floodsub {
85 events: VecDeque::new(),
86 config,
87 target_peers: FnvHashSet::default(),
88 connected_peers: HashMap::new(),
89 subscribed_topics: SmallVec::new(),
90 received: CuckooFilter::new(),
91 }
92 }
93
94 #[inline]
96 pub fn add_node_to_partial_view(&mut self, peer_id: PeerId) {
97 if self.connected_peers.contains_key(&peer_id) {
99 for topic in self.subscribed_topics.iter().cloned() {
100 self.events.push_back(ToSwarm::NotifyHandler {
101 peer_id,
102 handler: NotifyHandler::Any,
103 event: FloodsubRpc {
104 messages: Vec::new(),
105 subscriptions: vec![FloodsubSubscription {
106 topic,
107 action: FloodsubSubscriptionAction::Subscribe,
108 }],
109 },
110 });
111 }
112 }
113
114 if self.target_peers.insert(peer_id) {
115 self.events.push_back(ToSwarm::Dial {
116 opts: DialOpts::peer_id(peer_id).build(),
117 });
118 }
119 }
120
121 #[inline]
123 pub fn remove_node_from_partial_view(&mut self, peer_id: &PeerId) {
124 self.target_peers.remove(peer_id);
125 }
126
127 pub fn subscribe(&mut self, topic: Topic) -> bool {
131 if self.subscribed_topics.iter().any(|t| t.id() == topic.id()) {
132 return false;
133 }
134
135 for peer in self.connected_peers.keys() {
136 self.events.push_back(ToSwarm::NotifyHandler {
137 peer_id: *peer,
138 handler: NotifyHandler::Any,
139 event: FloodsubRpc {
140 messages: Vec::new(),
141 subscriptions: vec![FloodsubSubscription {
142 topic: topic.clone(),
143 action: FloodsubSubscriptionAction::Subscribe,
144 }],
145 },
146 });
147 }
148
149 self.subscribed_topics.push(topic);
150 true
151 }
152
153 pub fn unsubscribe(&mut self, topic: Topic) -> bool {
159 let Some(pos) = self.subscribed_topics.iter().position(|t| *t == topic) else {
160 return false;
161 };
162
163 self.subscribed_topics.remove(pos);
164
165 for peer in self.connected_peers.keys() {
166 self.events.push_back(ToSwarm::NotifyHandler {
167 peer_id: *peer,
168 handler: NotifyHandler::Any,
169 event: FloodsubRpc {
170 messages: Vec::new(),
171 subscriptions: vec![FloodsubSubscription {
172 topic: topic.clone(),
173 action: FloodsubSubscriptionAction::Unsubscribe,
174 }],
175 },
176 });
177 }
178
179 true
180 }
181
182 pub fn publish(&mut self, topic: impl Into<Topic>, data: impl Into<Bytes>) {
184 self.publish_many(iter::once(topic), data)
185 }
186
187 pub fn publish_any(&mut self, topic: impl Into<Topic>, data: impl Into<Bytes>) {
189 self.publish_many_any(iter::once(topic), data)
190 }
191
192 pub fn publish_many(
197 &mut self,
198 topic: impl IntoIterator<Item = impl Into<Topic>>,
199 data: impl Into<Bytes>,
200 ) {
201 self.publish_many_inner(topic, data, true)
202 }
203
204 pub fn publish_many_any(
207 &mut self,
208 topic: impl IntoIterator<Item = impl Into<Topic>>,
209 data: impl Into<Bytes>,
210 ) {
211 self.publish_many_inner(topic, data, false)
212 }
213
214 fn publish_many_inner(
215 &mut self,
216 topic: impl IntoIterator<Item = impl Into<Topic>>,
217 data: impl Into<Bytes>,
218 check_self_subscriptions: bool,
219 ) {
220 let message = FloodsubMessage {
221 source: self.config.local_peer_id,
222 data: data.into(),
223 sequence_number: rand::random::<[u8; 20]>().to_vec(),
227 topics: topic.into_iter().map(Into::into).collect(),
228 };
229
230 let self_subscribed = self
231 .subscribed_topics
232 .iter()
233 .any(|t| message.topics.iter().any(|u| t == u));
234 if self_subscribed {
235 if let Err(e @ CuckooError::NotEnoughSpace) = self.received.add(&message) {
236 tracing::warn!(
237 "Message was added to 'received' Cuckoofilter but some \
238 other message was removed as a consequence: {}",
239 e,
240 );
241 }
242 if self.config.subscribe_local_messages {
243 self.events
244 .push_back(ToSwarm::GenerateEvent(FloodsubEvent::Message(
245 message.clone(),
246 )));
247 }
248 }
249 if check_self_subscriptions && !self_subscribed {
252 return;
253 }
254
255 for (peer_id, sub_topic) in self.connected_peers.iter() {
257 if !self.target_peers.contains(peer_id) {
259 continue;
260 }
261
262 if !sub_topic
264 .iter()
265 .any(|t| message.topics.iter().any(|u| t == u))
266 {
267 continue;
268 }
269
270 self.events.push_back(ToSwarm::NotifyHandler {
271 peer_id: *peer_id,
272 handler: NotifyHandler::Any,
273 event: FloodsubRpc {
274 subscriptions: Vec::new(),
275 messages: vec![message.clone()],
276 },
277 });
278 }
279 }
280
281 fn on_connection_established(
282 &mut self,
283 ConnectionEstablished {
284 peer_id,
285 other_established,
286 ..
287 }: ConnectionEstablished,
288 ) {
289 if other_established > 0 {
290 return;
292 }
293
294 if self.target_peers.contains(&peer_id) {
296 for topic in self.subscribed_topics.iter().cloned() {
297 self.events.push_back(ToSwarm::NotifyHandler {
298 peer_id,
299 handler: NotifyHandler::Any,
300 event: FloodsubRpc {
301 messages: Vec::new(),
302 subscriptions: vec![FloodsubSubscription {
303 topic,
304 action: FloodsubSubscriptionAction::Subscribe,
305 }],
306 },
307 });
308 }
309 }
310
311 self.connected_peers.insert(peer_id, SmallVec::new());
312 }
313
314 fn on_connection_closed(
315 &mut self,
316 ConnectionClosed {
317 peer_id,
318 remaining_established,
319 ..
320 }: ConnectionClosed,
321 ) {
322 if remaining_established > 0 {
323 return;
325 }
326
327 let was_in = self.connected_peers.remove(&peer_id);
328 debug_assert!(was_in.is_some());
329
330 if self.target_peers.contains(&peer_id) {
333 self.events.push_back(ToSwarm::Dial {
334 opts: DialOpts::peer_id(peer_id).build(),
335 });
336 }
337 }
338}
339
340impl NetworkBehaviour for Floodsub {
341 type ConnectionHandler = OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>;
342 type ToSwarm = FloodsubEvent;
343
344 fn handle_established_inbound_connection(
345 &mut self,
346 _: ConnectionId,
347 _: PeerId,
348 _: &Multiaddr,
349 _: &Multiaddr,
350 ) -> Result<THandler<Self>, ConnectionDenied> {
351 Ok(Default::default())
352 }
353
354 fn handle_established_outbound_connection(
355 &mut self,
356 _: ConnectionId,
357 _: PeerId,
358 _: &Multiaddr,
359 _: Endpoint,
360 _: PortUse,
361 ) -> Result<THandler<Self>, ConnectionDenied> {
362 Ok(Default::default())
363 }
364
365 fn on_connection_handler_event(
366 &mut self,
367 propagation_source: PeerId,
368 connection_id: ConnectionId,
369 event: THandlerOutEvent<Self>,
370 ) {
371 let event = match event {
373 Ok(InnerMessage::Rx(event)) => event,
374 Ok(InnerMessage::Sent) => return,
375 Err(e) => {
376 tracing::debug!("Failed to send floodsub message: {e}");
377 self.events.push_back(ToSwarm::CloseConnection {
378 peer_id: propagation_source,
379 connection: CloseConnection::One(connection_id),
380 });
381 return;
382 }
383 };
384
385 for subscription in event.subscriptions {
387 let remote_peer_topics = self.connected_peers
388 .get_mut(&propagation_source)
389 .expect("connected_peers is kept in sync with the peers we are connected to; we are guaranteed to only receive events from connected peers; QED");
390 match subscription.action {
391 FloodsubSubscriptionAction::Subscribe => {
392 if !remote_peer_topics.contains(&subscription.topic) {
393 remote_peer_topics.push(subscription.topic.clone());
394 }
395 self.events
396 .push_back(ToSwarm::GenerateEvent(FloodsubEvent::Subscribed {
397 peer_id: propagation_source,
398 topic: subscription.topic,
399 }));
400 }
401 FloodsubSubscriptionAction::Unsubscribe => {
402 if let Some(pos) = remote_peer_topics
403 .iter()
404 .position(|t| t == &subscription.topic)
405 {
406 remote_peer_topics.remove(pos);
407 }
408 self.events
409 .push_back(ToSwarm::GenerateEvent(FloodsubEvent::Unsubscribed {
410 peer_id: propagation_source,
411 topic: subscription.topic,
412 }));
413 }
414 }
415 }
416
417 let mut rpcs_to_dispatch: Vec<(PeerId, FloodsubRpc)> = Vec::new();
419
420 for message in event.messages {
421 match self.received.test_and_add(&message) {
424 Ok(true) => {} Ok(false) => continue, Err(e @ CuckooError::NotEnoughSpace) => {
427 tracing::warn!(
429 "Message was added to 'received' Cuckoofilter but some \
430 other message was removed as a consequence: {}",
431 e,
432 );
433 }
434 }
435
436 if self
438 .subscribed_topics
439 .iter()
440 .any(|t| message.topics.iter().any(|u| t == u))
441 {
442 let event = FloodsubEvent::Message(message.clone());
443 self.events.push_back(ToSwarm::GenerateEvent(event));
444 }
445
446 for (peer_id, subscr_topics) in self.connected_peers.iter() {
448 if peer_id == &propagation_source {
449 continue;
450 }
451
452 if !self.target_peers.contains(peer_id) {
454 continue;
455 }
456
457 if !subscr_topics
459 .iter()
460 .any(|t| message.topics.iter().any(|u| t == u))
461 {
462 continue;
463 }
464
465 if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) {
466 rpcs_to_dispatch[pos].1.messages.push(message.clone());
467 } else {
468 rpcs_to_dispatch.push((
469 *peer_id,
470 FloodsubRpc {
471 subscriptions: Vec::new(),
472 messages: vec![message.clone()],
473 },
474 ));
475 }
476 }
477 }
478
479 for (peer_id, rpc) in rpcs_to_dispatch {
480 self.events.push_back(ToSwarm::NotifyHandler {
481 peer_id,
482 handler: NotifyHandler::Any,
483 event: rpc,
484 });
485 }
486 }
487
488 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
489 fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
490 if let Some(event) = self.events.pop_front() {
491 return Poll::Ready(event);
492 }
493
494 Poll::Pending
495 }
496
497 fn on_swarm_event(&mut self, event: FromSwarm) {
498 match event {
499 FromSwarm::ConnectionEstablished(connection_established) => {
500 self.on_connection_established(connection_established)
501 }
502 FromSwarm::ConnectionClosed(connection_closed) => {
503 self.on_connection_closed(connection_closed)
504 }
505 _ => {}
506 }
507 }
508}
509
510#[derive(Debug)]
512pub enum InnerMessage {
513 Rx(FloodsubRpc),
515 Sent,
517}
518
519impl From<FloodsubRpc> for InnerMessage {
520 #[inline]
521 fn from(rpc: FloodsubRpc) -> InnerMessage {
522 InnerMessage::Rx(rpc)
523 }
524}
525
526impl From<()> for InnerMessage {
527 #[inline]
528 fn from(_: ()) -> InnerMessage {
529 InnerMessage::Sent
530 }
531}
532
533#[derive(Debug)]
535pub enum FloodsubEvent {
536 Message(FloodsubMessage),
538
539 Subscribed {
541 peer_id: PeerId,
543 topic: Topic,
545 },
546
547 Unsubscribed {
549 peer_id: PeerId,
551 topic: Topic,
553 },
554}