1use crate::service::{metrics::PeerStoreMetrics, traits::PeerStore as PeerStoreT};
23
24use libp2p::PeerId;
25use log::trace;
26use parking_lot::Mutex;
27use partial_sort::PartialSort;
28use prometheus_endpoint::Registry;
29use sc_network_common::{role::ObservedRole, types::ReputationChange};
30use std::{
31 cmp::{Ord, Ordering, PartialOrd},
32 collections::{hash_map::Entry, HashMap, HashSet},
33 fmt::Debug,
34 sync::Arc,
35 time::{Duration, Instant},
36};
37use wasm_timer::Delay;
38
39pub const LOG_TARGET: &str = "peerset";
41
42pub const BANNED_THRESHOLD: i32 = 71 * (i32::MIN / 100);
44const DISCONNECT_REPUTATION_CHANGE: i32 = -256;
46const INVERSE_DECREMENT: i32 = 200;
59const FORGET_AFTER: Duration = Duration::from_secs(3600);
62
63pub trait ProtocolHandle: Debug + Send + Sync {
65 fn disconnect_peer(&self, peer_id: sc_network_types::PeerId);
67}
68
69pub trait PeerStoreProvider: Debug + Send + Sync {
71 fn is_banned(&self, peer_id: &sc_network_types::PeerId) -> bool;
73
74 fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandle>);
76
77 fn report_disconnect(&self, peer_id: sc_network_types::PeerId);
79
80 fn report_peer(&self, peer_id: sc_network_types::PeerId, change: ReputationChange);
82
83 fn set_peer_role(&self, peer_id: &sc_network_types::PeerId, role: ObservedRole);
85
86 fn peer_reputation(&self, peer_id: &sc_network_types::PeerId) -> i32;
88
89 fn peer_role(&self, peer_id: &sc_network_types::PeerId) -> Option<ObservedRole>;
91
92 fn outgoing_candidates(
94 &self,
95 count: usize,
96 ignored: HashSet<sc_network_types::PeerId>,
97 ) -> Vec<sc_network_types::PeerId>;
98
99 fn add_known_peer(&self, peer_id: sc_network_types::PeerId);
101}
102
103#[derive(Debug, Clone)]
105pub struct PeerStoreHandle {
106 inner: Arc<Mutex<PeerStoreInner>>,
107}
108
109impl PeerStoreProvider for PeerStoreHandle {
110 fn is_banned(&self, peer_id: &sc_network_types::PeerId) -> bool {
111 self.inner.lock().is_banned(&peer_id.into())
112 }
113
114 fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandle>) {
115 self.inner.lock().register_protocol(protocol_handle);
116 }
117
118 fn report_disconnect(&self, peer_id: sc_network_types::PeerId) {
119 let mut inner = self.inner.lock();
120 inner.report_disconnect(peer_id.into())
121 }
122
123 fn report_peer(&self, peer_id: sc_network_types::PeerId, change: ReputationChange) {
124 let mut inner = self.inner.lock();
125 inner.report_peer(peer_id.into(), change)
126 }
127
128 fn set_peer_role(&self, peer_id: &sc_network_types::PeerId, role: ObservedRole) {
129 let mut inner = self.inner.lock();
130 inner.set_peer_role(&peer_id.into(), role)
131 }
132
133 fn peer_reputation(&self, peer_id: &sc_network_types::PeerId) -> i32 {
134 self.inner.lock().peer_reputation(&peer_id.into())
135 }
136
137 fn peer_role(&self, peer_id: &sc_network_types::PeerId) -> Option<ObservedRole> {
138 self.inner.lock().peer_role(&peer_id.into())
139 }
140
141 fn outgoing_candidates(
142 &self,
143 count: usize,
144 ignored: HashSet<sc_network_types::PeerId>,
145 ) -> Vec<sc_network_types::PeerId> {
146 self.inner
147 .lock()
148 .outgoing_candidates(count, ignored.iter().map(|peer_id| (*peer_id).into()).collect())
149 .iter()
150 .map(|peer_id| peer_id.into())
151 .collect()
152 }
153
154 fn add_known_peer(&self, peer_id: sc_network_types::PeerId) {
155 self.inner.lock().add_known_peer(peer_id.into());
156 }
157}
158
159#[derive(Debug, Clone, Copy)]
160struct PeerInfo {
161 reputation: i32,
163
164 last_updated: Instant,
166
167 role: Option<ObservedRole>,
169}
170
171impl Default for PeerInfo {
172 fn default() -> Self {
173 Self { reputation: 0, last_updated: Instant::now(), role: None }
174 }
175}
176
177impl PartialEq for PeerInfo {
178 fn eq(&self, other: &Self) -> bool {
179 self.reputation == other.reputation
180 }
181}
182
183impl Eq for PeerInfo {}
184
185impl Ord for PeerInfo {
186 fn cmp(&self, other: &Self) -> Ordering {
188 self.reputation.cmp(&other.reputation).reverse()
189 }
190}
191
192impl PartialOrd for PeerInfo {
193 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
194 Some(self.cmp(other))
195 }
196}
197
198impl PeerInfo {
199 fn is_banned(&self) -> bool {
200 self.reputation < BANNED_THRESHOLD
201 }
202
203 fn add_reputation(&mut self, increment: i32) {
204 self.reputation = self.reputation.saturating_add(increment);
205 self.bump_last_updated();
206 }
207
208 fn decay_reputation(&mut self, seconds_passed: u64) {
209 for _ in 0..seconds_passed {
212 let mut diff = self.reputation / INVERSE_DECREMENT;
213 if diff == 0 && self.reputation < 0 {
214 diff = -1;
215 } else if diff == 0 && self.reputation > 0 {
216 diff = 1;
217 }
218
219 self.reputation = self.reputation.saturating_sub(diff);
220
221 if self.reputation == 0 {
222 break
223 }
224 }
225 }
226
227 fn bump_last_updated(&mut self) {
228 self.last_updated = Instant::now();
229 }
230}
231
232#[derive(Debug)]
233struct PeerStoreInner {
234 peers: HashMap<PeerId, PeerInfo>,
235 protocols: Vec<Arc<dyn ProtocolHandle>>,
236 metrics: Option<PeerStoreMetrics>,
237}
238
239impl PeerStoreInner {
240 fn is_banned(&self, peer_id: &PeerId) -> bool {
241 self.peers.get(peer_id).map_or(false, |info| info.is_banned())
242 }
243
244 fn register_protocol(&mut self, protocol_handle: Arc<dyn ProtocolHandle>) {
245 self.protocols.push(protocol_handle);
246 }
247
248 fn report_disconnect(&mut self, peer_id: PeerId) {
249 let peer_info = self.peers.entry(peer_id).or_default();
250 peer_info.add_reputation(DISCONNECT_REPUTATION_CHANGE);
251
252 log::trace!(
253 target: LOG_TARGET,
254 "Peer {} disconnected, reputation: {:+} to {}",
255 peer_id,
256 DISCONNECT_REPUTATION_CHANGE,
257 peer_info.reputation,
258 );
259 }
260
261 fn report_peer(&mut self, peer_id: PeerId, change: ReputationChange) {
262 let peer_info = self.peers.entry(peer_id).or_default();
263 let was_banned = peer_info.is_banned();
264 peer_info.add_reputation(change.value);
265
266 log::trace!(
267 target: LOG_TARGET,
268 "Report {}: {:+} to {}. Reason: {}.",
269 peer_id,
270 change.value,
271 peer_info.reputation,
272 change.reason,
273 );
274
275 if !peer_info.is_banned() {
276 if was_banned {
277 log::info!(
278 target: LOG_TARGET,
279 "Peer {} is now unbanned: {:+} to {}. Reason: {}.",
280 peer_id,
281 change.value,
282 peer_info.reputation,
283 change.reason,
284 );
285 }
286 return;
287 }
288
289 self.protocols.iter().for_each(|handle| handle.disconnect_peer(peer_id.into()));
291
292 if !was_banned {
294 log::warn!(
295 target: LOG_TARGET,
296 "Report {}: {:+} to {}. Reason: {}. Banned, disconnecting.",
297 peer_id,
298 change.value,
299 peer_info.reputation,
300 change.reason,
301 );
302 return;
303 }
304
305 if change.value < 0 {
308 log::debug!(
309 target: LOG_TARGET,
310 "Report {}: {:+} to {}. Reason: {}. Misbehaved during the ban threshold.",
311 peer_id,
312 change.value,
313 peer_info.reputation,
314 change.reason,
315 );
316 }
317 }
318
319 fn set_peer_role(&mut self, peer_id: &PeerId, role: ObservedRole) {
320 log::trace!(target: LOG_TARGET, "Set {peer_id} role to {role:?}");
321
322 match self.peers.entry(*peer_id) {
323 Entry::Occupied(mut entry) => {
324 entry.get_mut().role = Some(role);
325 },
326 Entry::Vacant(entry) => {
327 entry.insert(PeerInfo { role: Some(role), ..Default::default() });
328 },
329 }
330 }
331
332 fn peer_reputation(&self, peer_id: &PeerId) -> i32 {
333 self.peers.get(peer_id).map_or(0, |info| info.reputation)
334 }
335
336 fn peer_role(&self, peer_id: &PeerId) -> Option<ObservedRole> {
337 self.peers.get(peer_id).map_or(None, |info| info.role)
338 }
339
340 fn outgoing_candidates(&self, count: usize, ignored: HashSet<PeerId>) -> Vec<PeerId> {
341 let mut candidates = self
342 .peers
343 .iter()
344 .filter_map(|(peer_id, info)| {
345 (!info.is_banned() && !ignored.contains(peer_id)).then_some((*peer_id, *info))
346 })
347 .collect::<Vec<_>>();
348 let count = std::cmp::min(count, candidates.len());
349 candidates.partial_sort(count, |(_, info1), (_, info2)| info1.cmp(info2));
350 candidates.iter().take(count).map(|(peer_id, _)| *peer_id).collect()
351
352 }
354
355 fn progress_time(&mut self, seconds_passed: u64) {
356 if seconds_passed == 0 {
357 return
358 }
359
360 self.peers
362 .iter_mut()
363 .for_each(|(_, info)| info.decay_reputation(seconds_passed));
364
365 let now = Instant::now();
367 let mut num_banned_peers: u64 = 0;
368 self.peers.retain(|_, info| {
369 if info.is_banned() {
370 num_banned_peers += 1;
371 }
372
373 info.reputation != 0 || info.last_updated + FORGET_AFTER > now
374 });
375
376 if let Some(metrics) = &self.metrics {
377 metrics.num_discovered.set(self.peers.len() as u64);
378 metrics.num_banned_peers.set(num_banned_peers);
379 }
380 }
381
382 fn add_known_peer(&mut self, peer_id: PeerId) {
383 match self.peers.entry(peer_id) {
384 Entry::Occupied(mut e) => {
385 trace!(
386 target: LOG_TARGET,
387 "Trying to add an already known peer {peer_id}, bumping `last_updated`.",
388 );
389 e.get_mut().bump_last_updated();
390 },
391 Entry::Vacant(e) => {
392 trace!(target: LOG_TARGET, "Adding a new known peer {peer_id}.");
393 e.insert(PeerInfo::default());
394 },
395 }
396 }
397}
398
399#[derive(Debug)]
401pub struct PeerStore {
402 inner: Arc<Mutex<PeerStoreInner>>,
403}
404
405impl PeerStore {
406 pub fn new(bootnodes: Vec<PeerId>, metrics_registry: Option<Registry>) -> Self {
408 let metrics = if let Some(registry) = &metrics_registry {
409 PeerStoreMetrics::register(registry)
410 .map_err(|err| {
411 log::error!(target: LOG_TARGET, "Failed to register peer set metrics: {}", err);
412 err
413 })
414 .ok()
415 } else {
416 None
417 };
418
419 PeerStore {
420 inner: Arc::new(Mutex::new(PeerStoreInner {
421 peers: bootnodes
422 .into_iter()
423 .map(|peer_id| (peer_id, PeerInfo::default()))
424 .collect(),
425 protocols: Vec::new(),
426 metrics,
427 })),
428 }
429 }
430
431 pub fn handle(&self) -> PeerStoreHandle {
433 PeerStoreHandle { inner: self.inner.clone() }
434 }
435
436 pub async fn run(self) {
438 let started = Instant::now();
439 let mut latest_time_update = started;
440
441 loop {
442 let now = Instant::now();
443 let seconds_passed = {
446 let elapsed_latest = latest_time_update - started;
447 let elapsed_now = now - started;
448 latest_time_update = now;
449 elapsed_now.as_secs() - elapsed_latest.as_secs()
450 };
451
452 self.inner.lock().progress_time(seconds_passed);
453 let _ = Delay::new(Duration::from_secs(1)).await;
454 }
455 }
456}
457
458#[async_trait::async_trait]
459impl PeerStoreT for PeerStore {
460 fn handle(&self) -> Arc<dyn PeerStoreProvider> {
461 Arc::new(self.handle())
462 }
463
464 async fn run(self) {
465 self.run().await;
466 }
467}
468
469#[cfg(test)]
470mod tests {
471 use super::{PeerInfo, PeerStore, PeerStoreProvider};
472
473 #[test]
474 fn decaying_zero_reputation_yields_zero() {
475 let mut peer_info = PeerInfo::default();
476 assert_eq!(peer_info.reputation, 0);
477
478 peer_info.decay_reputation(1);
479 assert_eq!(peer_info.reputation, 0);
480
481 peer_info.decay_reputation(100_000);
482 assert_eq!(peer_info.reputation, 0);
483 }
484
485 #[test]
486 fn decaying_positive_reputation_decreases_it() {
487 const INITIAL_REPUTATION: i32 = 100;
488
489 let mut peer_info = PeerInfo::default();
490 peer_info.reputation = INITIAL_REPUTATION;
491
492 peer_info.decay_reputation(1);
493 assert!(peer_info.reputation >= 0);
494 assert!(peer_info.reputation < INITIAL_REPUTATION);
495 }
496
497 #[test]
498 fn decaying_negative_reputation_increases_it() {
499 const INITIAL_REPUTATION: i32 = -100;
500
501 let mut peer_info = PeerInfo::default();
502 peer_info.reputation = INITIAL_REPUTATION;
503
504 peer_info.decay_reputation(1);
505 assert!(peer_info.reputation <= 0);
506 assert!(peer_info.reputation > INITIAL_REPUTATION);
507 }
508
509 #[test]
510 fn decaying_max_reputation_finally_yields_zero() {
511 const INITIAL_REPUTATION: i32 = i32::MAX;
512 const SECONDS: u64 = 3544;
513
514 let mut peer_info = PeerInfo::default();
515 peer_info.reputation = INITIAL_REPUTATION;
516
517 peer_info.decay_reputation(SECONDS / 2);
518 assert!(peer_info.reputation > 0);
519
520 peer_info.decay_reputation(SECONDS / 2);
521 assert_eq!(peer_info.reputation, 0);
522 }
523
524 #[test]
525 fn decaying_min_reputation_finally_yields_zero() {
526 const INITIAL_REPUTATION: i32 = i32::MIN;
527 const SECONDS: u64 = 3544;
528
529 let mut peer_info = PeerInfo::default();
530 peer_info.reputation = INITIAL_REPUTATION;
531
532 peer_info.decay_reputation(SECONDS / 2);
533 assert!(peer_info.reputation < 0);
534
535 peer_info.decay_reputation(SECONDS / 2);
536 assert_eq!(peer_info.reputation, 0);
537 }
538
539 #[test]
540 fn report_banned_peers() {
541 let peer_a = sc_network_types::PeerId::random();
542 let peer_b = sc_network_types::PeerId::random();
543 let peer_c = sc_network_types::PeerId::random();
544
545 let metrics_registry = prometheus_endpoint::Registry::new();
546 let peerstore = PeerStore::new(
547 vec![peer_a, peer_b, peer_c].into_iter().map(Into::into).collect(),
548 Some(metrics_registry),
549 );
550 let metrics = peerstore.inner.lock().metrics.as_ref().unwrap().clone();
551 let handle = peerstore.handle();
552
553 handle.inner.lock().progress_time(1);
555 assert_eq!(metrics.num_discovered.get(), 3);
556 assert_eq!(metrics.num_banned_peers.get(), 0);
557
558 handle.report_peer(
560 peer_a,
561 sc_network_common::types::ReputationChange { value: i32::MIN, reason: "test".into() },
562 );
563 handle.report_peer(
564 peer_b,
565 sc_network_common::types::ReputationChange { value: i32::MIN, reason: "test".into() },
566 );
567
568 handle.inner.lock().progress_time(1);
570 assert_eq!(metrics.num_discovered.get(), 3);
571 assert_eq!(metrics.num_banned_peers.get(), 2);
572 }
573}