sc_network/
peer_store.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! [`PeerStore`] manages peer reputations and provides connection candidates to
20//! [`crate::protocol_controller::ProtocolController`].
21
22use crate::service::{metrics::PeerStoreMetrics, traits::PeerStore as PeerStoreT};
23
24use libp2p::PeerId;
25use log::trace;
26use parking_lot::Mutex;
27use partial_sort::PartialSort;
28use prometheus_endpoint::Registry;
29use sc_network_common::{role::ObservedRole, types::ReputationChange};
30use std::{
31	cmp::{Ord, Ordering, PartialOrd},
32	collections::{hash_map::Entry, HashMap, HashSet},
33	fmt::Debug,
34	sync::Arc,
35	time::{Duration, Instant},
36};
37use wasm_timer::Delay;
38
39/// Log target for this file.
40pub const LOG_TARGET: &str = "peerset";
41
42/// We don't accept nodes whose reputation is under this value.
43pub const BANNED_THRESHOLD: i32 = 71 * (i32::MIN / 100);
44/// Reputation change for a node when we get disconnected from it.
45const DISCONNECT_REPUTATION_CHANGE: i32 = -256;
46/// Relative decrement of a reputation value that is applied every second. I.e., for inverse
47/// decrement of 200 we decrease absolute value of the reputation by 1/200.
48///
49/// This corresponds to a factor of `k = 0.955`, where k = 1 - 1 / INVERSE_DECREMENT.
50///
51/// It takes ~ `ln(0.5) / ln(k)` seconds to reduce the reputation by half, or 138.63 seconds for the
52/// values above.
53///
54/// In this setup:
55/// - `i32::MAX` becomes 0 in exactly 3544 seconds, or approximately 59 minutes
56/// - `i32::MIN` becomes 0 in exactly 3544 seconds, or approximately 59 minutes
57/// - `i32::MIN` escapes the banned threshold in 69 seconds
58const INVERSE_DECREMENT: i32 = 200;
59/// Amount of time between the moment we last updated the [`PeerStore`] entry and the moment we
60/// remove it, once the reputation value reaches 0.
61const FORGET_AFTER: Duration = Duration::from_secs(3600);
62
63/// Trait describing the required functionality from a `Peerset` handle.
64pub trait ProtocolHandle: Debug + Send + Sync {
65	/// Disconnect peer.
66	fn disconnect_peer(&self, peer_id: sc_network_types::PeerId);
67}
68
69/// Trait providing peer reputation management and connection candidates.
70pub trait PeerStoreProvider: Debug + Send + Sync {
71	/// Check whether the peer is banned.
72	fn is_banned(&self, peer_id: &sc_network_types::PeerId) -> bool;
73
74	/// Register a protocol handle to disconnect peers whose reputation drops below the threshold.
75	fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandle>);
76
77	/// Report peer disconnection for reputation adjustment.
78	fn report_disconnect(&self, peer_id: sc_network_types::PeerId);
79
80	/// Adjust peer reputation.
81	fn report_peer(&self, peer_id: sc_network_types::PeerId, change: ReputationChange);
82
83	/// Set peer role.
84	fn set_peer_role(&self, peer_id: &sc_network_types::PeerId, role: ObservedRole);
85
86	/// Get peer reputation.
87	fn peer_reputation(&self, peer_id: &sc_network_types::PeerId) -> i32;
88
89	/// Get peer role, if available.
90	fn peer_role(&self, peer_id: &sc_network_types::PeerId) -> Option<ObservedRole>;
91
92	/// Get candidates with highest reputations for initiating outgoing connections.
93	fn outgoing_candidates(
94		&self,
95		count: usize,
96		ignored: HashSet<sc_network_types::PeerId>,
97	) -> Vec<sc_network_types::PeerId>;
98
99	/// Add known peer.
100	fn add_known_peer(&self, peer_id: sc_network_types::PeerId);
101}
102
103/// Actual implementation of peer reputations and connection candidates provider.
104#[derive(Debug, Clone)]
105pub struct PeerStoreHandle {
106	inner: Arc<Mutex<PeerStoreInner>>,
107}
108
109impl PeerStoreProvider for PeerStoreHandle {
110	fn is_banned(&self, peer_id: &sc_network_types::PeerId) -> bool {
111		self.inner.lock().is_banned(&peer_id.into())
112	}
113
114	fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandle>) {
115		self.inner.lock().register_protocol(protocol_handle);
116	}
117
118	fn report_disconnect(&self, peer_id: sc_network_types::PeerId) {
119		let mut inner = self.inner.lock();
120		inner.report_disconnect(peer_id.into())
121	}
122
123	fn report_peer(&self, peer_id: sc_network_types::PeerId, change: ReputationChange) {
124		let mut inner = self.inner.lock();
125		inner.report_peer(peer_id.into(), change)
126	}
127
128	fn set_peer_role(&self, peer_id: &sc_network_types::PeerId, role: ObservedRole) {
129		let mut inner = self.inner.lock();
130		inner.set_peer_role(&peer_id.into(), role)
131	}
132
133	fn peer_reputation(&self, peer_id: &sc_network_types::PeerId) -> i32 {
134		self.inner.lock().peer_reputation(&peer_id.into())
135	}
136
137	fn peer_role(&self, peer_id: &sc_network_types::PeerId) -> Option<ObservedRole> {
138		self.inner.lock().peer_role(&peer_id.into())
139	}
140
141	fn outgoing_candidates(
142		&self,
143		count: usize,
144		ignored: HashSet<sc_network_types::PeerId>,
145	) -> Vec<sc_network_types::PeerId> {
146		self.inner
147			.lock()
148			.outgoing_candidates(count, ignored.iter().map(|peer_id| (*peer_id).into()).collect())
149			.iter()
150			.map(|peer_id| peer_id.into())
151			.collect()
152	}
153
154	fn add_known_peer(&self, peer_id: sc_network_types::PeerId) {
155		self.inner.lock().add_known_peer(peer_id.into());
156	}
157}
158
159#[derive(Debug, Clone, Copy)]
160struct PeerInfo {
161	/// Reputation of the peer.
162	reputation: i32,
163
164	/// Instant when the peer was last updated.
165	last_updated: Instant,
166
167	/// Role of the peer, if known.
168	role: Option<ObservedRole>,
169}
170
171impl Default for PeerInfo {
172	fn default() -> Self {
173		Self { reputation: 0, last_updated: Instant::now(), role: None }
174	}
175}
176
177impl PartialEq for PeerInfo {
178	fn eq(&self, other: &Self) -> bool {
179		self.reputation == other.reputation
180	}
181}
182
183impl Eq for PeerInfo {}
184
185impl Ord for PeerInfo {
186	// We define reverse order by reputation values.
187	fn cmp(&self, other: &Self) -> Ordering {
188		self.reputation.cmp(&other.reputation).reverse()
189	}
190}
191
192impl PartialOrd for PeerInfo {
193	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
194		Some(self.cmp(other))
195	}
196}
197
198impl PeerInfo {
199	fn is_banned(&self) -> bool {
200		self.reputation < BANNED_THRESHOLD
201	}
202
203	fn add_reputation(&mut self, increment: i32) {
204		self.reputation = self.reputation.saturating_add(increment);
205		self.bump_last_updated();
206	}
207
208	fn decay_reputation(&mut self, seconds_passed: u64) {
209		// Note that decaying the reputation value happens "on its own",
210		// so we don't do `bump_last_updated()`.
211		for _ in 0..seconds_passed {
212			let mut diff = self.reputation / INVERSE_DECREMENT;
213			if diff == 0 && self.reputation < 0 {
214				diff = -1;
215			} else if diff == 0 && self.reputation > 0 {
216				diff = 1;
217			}
218
219			self.reputation = self.reputation.saturating_sub(diff);
220
221			if self.reputation == 0 {
222				break
223			}
224		}
225	}
226
227	fn bump_last_updated(&mut self) {
228		self.last_updated = Instant::now();
229	}
230}
231
232#[derive(Debug)]
233struct PeerStoreInner {
234	peers: HashMap<PeerId, PeerInfo>,
235	protocols: Vec<Arc<dyn ProtocolHandle>>,
236	metrics: Option<PeerStoreMetrics>,
237}
238
239impl PeerStoreInner {
240	fn is_banned(&self, peer_id: &PeerId) -> bool {
241		self.peers.get(peer_id).map_or(false, |info| info.is_banned())
242	}
243
244	fn register_protocol(&mut self, protocol_handle: Arc<dyn ProtocolHandle>) {
245		self.protocols.push(protocol_handle);
246	}
247
248	fn report_disconnect(&mut self, peer_id: PeerId) {
249		let peer_info = self.peers.entry(peer_id).or_default();
250		peer_info.add_reputation(DISCONNECT_REPUTATION_CHANGE);
251
252		log::trace!(
253			target: LOG_TARGET,
254			"Peer {} disconnected, reputation: {:+} to {}",
255			peer_id,
256			DISCONNECT_REPUTATION_CHANGE,
257			peer_info.reputation,
258		);
259	}
260
261	fn report_peer(&mut self, peer_id: PeerId, change: ReputationChange) {
262		let peer_info = self.peers.entry(peer_id).or_default();
263		let was_banned = peer_info.is_banned();
264		peer_info.add_reputation(change.value);
265
266		log::trace!(
267			target: LOG_TARGET,
268			"Report {}: {:+} to {}. Reason: {}.",
269			peer_id,
270			change.value,
271			peer_info.reputation,
272			change.reason,
273		);
274
275		if !peer_info.is_banned() {
276			if was_banned {
277				log::info!(
278					target: LOG_TARGET,
279					"Peer {} is now unbanned: {:+} to {}. Reason: {}.",
280					peer_id,
281					change.value,
282					peer_info.reputation,
283					change.reason,
284				);
285			}
286			return;
287		}
288
289		// Peer is currently banned, disconnect it from all protocols.
290		self.protocols.iter().for_each(|handle| handle.disconnect_peer(peer_id.into()));
291
292		// The peer is banned for the first time.
293		if !was_banned {
294			log::warn!(
295				target: LOG_TARGET,
296				"Report {}: {:+} to {}. Reason: {}. Banned, disconnecting.",
297				peer_id,
298				change.value,
299				peer_info.reputation,
300				change.reason,
301			);
302			return;
303		}
304
305		// The peer was already banned and it got another negative report.
306		// This may happen during a batch report.
307		if change.value < 0 {
308			log::debug!(
309				target: LOG_TARGET,
310				"Report {}: {:+} to {}. Reason: {}. Misbehaved during the ban threshold.",
311				peer_id,
312				change.value,
313				peer_info.reputation,
314				change.reason,
315			);
316		}
317	}
318
319	fn set_peer_role(&mut self, peer_id: &PeerId, role: ObservedRole) {
320		log::trace!(target: LOG_TARGET, "Set {peer_id} role to {role:?}");
321
322		match self.peers.entry(*peer_id) {
323			Entry::Occupied(mut entry) => {
324				entry.get_mut().role = Some(role);
325			},
326			Entry::Vacant(entry) => {
327				entry.insert(PeerInfo { role: Some(role), ..Default::default() });
328			},
329		}
330	}
331
332	fn peer_reputation(&self, peer_id: &PeerId) -> i32 {
333		self.peers.get(peer_id).map_or(0, |info| info.reputation)
334	}
335
336	fn peer_role(&self, peer_id: &PeerId) -> Option<ObservedRole> {
337		self.peers.get(peer_id).map_or(None, |info| info.role)
338	}
339
340	fn outgoing_candidates(&self, count: usize, ignored: HashSet<PeerId>) -> Vec<PeerId> {
341		let mut candidates = self
342			.peers
343			.iter()
344			.filter_map(|(peer_id, info)| {
345				(!info.is_banned() && !ignored.contains(peer_id)).then_some((*peer_id, *info))
346			})
347			.collect::<Vec<_>>();
348		let count = std::cmp::min(count, candidates.len());
349		candidates.partial_sort(count, |(_, info1), (_, info2)| info1.cmp(info2));
350		candidates.iter().take(count).map(|(peer_id, _)| *peer_id).collect()
351
352		// TODO: keep the peers sorted (in a "bi-multi-map"?) to not repeat sorting every time.
353	}
354
355	fn progress_time(&mut self, seconds_passed: u64) {
356		if seconds_passed == 0 {
357			return
358		}
359
360		// Drive reputation values towards 0.
361		self.peers
362			.iter_mut()
363			.for_each(|(_, info)| info.decay_reputation(seconds_passed));
364
365		// Retain only entries with non-zero reputation values or not expired ones.
366		let now = Instant::now();
367		let mut num_banned_peers: u64 = 0;
368		self.peers.retain(|_, info| {
369			if info.is_banned() {
370				num_banned_peers += 1;
371			}
372
373			info.reputation != 0 || info.last_updated + FORGET_AFTER > now
374		});
375
376		if let Some(metrics) = &self.metrics {
377			metrics.num_discovered.set(self.peers.len() as u64);
378			metrics.num_banned_peers.set(num_banned_peers);
379		}
380	}
381
382	fn add_known_peer(&mut self, peer_id: PeerId) {
383		match self.peers.entry(peer_id) {
384			Entry::Occupied(mut e) => {
385				trace!(
386					target: LOG_TARGET,
387					"Trying to add an already known peer {peer_id}, bumping `last_updated`.",
388				);
389				e.get_mut().bump_last_updated();
390			},
391			Entry::Vacant(e) => {
392				trace!(target: LOG_TARGET, "Adding a new known peer {peer_id}.");
393				e.insert(PeerInfo::default());
394			},
395		}
396	}
397}
398
399/// Worker part of [`PeerStoreHandle`]
400#[derive(Debug)]
401pub struct PeerStore {
402	inner: Arc<Mutex<PeerStoreInner>>,
403}
404
405impl PeerStore {
406	/// Create a new peer store from the list of bootnodes.
407	pub fn new(bootnodes: Vec<PeerId>, metrics_registry: Option<Registry>) -> Self {
408		let metrics = if let Some(registry) = &metrics_registry {
409			PeerStoreMetrics::register(registry)
410				.map_err(|err| {
411					log::error!(target: LOG_TARGET, "Failed to register peer set metrics: {}", err);
412					err
413				})
414				.ok()
415		} else {
416			None
417		};
418
419		PeerStore {
420			inner: Arc::new(Mutex::new(PeerStoreInner {
421				peers: bootnodes
422					.into_iter()
423					.map(|peer_id| (peer_id, PeerInfo::default()))
424					.collect(),
425				protocols: Vec::new(),
426				metrics,
427			})),
428		}
429	}
430
431	/// Get `PeerStoreHandle`.
432	pub fn handle(&self) -> PeerStoreHandle {
433		PeerStoreHandle { inner: self.inner.clone() }
434	}
435
436	/// Drive the `PeerStore`, decaying reputation values over time and removing expired entries.
437	pub async fn run(self) {
438		let started = Instant::now();
439		let mut latest_time_update = started;
440
441		loop {
442			let now = Instant::now();
443			// We basically do `(now - self.latest_update).as_secs()`, except that by the way we do
444			// it we know that we're not going to miss seconds because of rounding to integers.
445			let seconds_passed = {
446				let elapsed_latest = latest_time_update - started;
447				let elapsed_now = now - started;
448				latest_time_update = now;
449				elapsed_now.as_secs() - elapsed_latest.as_secs()
450			};
451
452			self.inner.lock().progress_time(seconds_passed);
453			let _ = Delay::new(Duration::from_secs(1)).await;
454		}
455	}
456}
457
458#[async_trait::async_trait]
459impl PeerStoreT for PeerStore {
460	fn handle(&self) -> Arc<dyn PeerStoreProvider> {
461		Arc::new(self.handle())
462	}
463
464	async fn run(self) {
465		self.run().await;
466	}
467}
468
469#[cfg(test)]
470mod tests {
471	use super::{PeerInfo, PeerStore, PeerStoreProvider};
472
473	#[test]
474	fn decaying_zero_reputation_yields_zero() {
475		let mut peer_info = PeerInfo::default();
476		assert_eq!(peer_info.reputation, 0);
477
478		peer_info.decay_reputation(1);
479		assert_eq!(peer_info.reputation, 0);
480
481		peer_info.decay_reputation(100_000);
482		assert_eq!(peer_info.reputation, 0);
483	}
484
485	#[test]
486	fn decaying_positive_reputation_decreases_it() {
487		const INITIAL_REPUTATION: i32 = 100;
488
489		let mut peer_info = PeerInfo::default();
490		peer_info.reputation = INITIAL_REPUTATION;
491
492		peer_info.decay_reputation(1);
493		assert!(peer_info.reputation >= 0);
494		assert!(peer_info.reputation < INITIAL_REPUTATION);
495	}
496
497	#[test]
498	fn decaying_negative_reputation_increases_it() {
499		const INITIAL_REPUTATION: i32 = -100;
500
501		let mut peer_info = PeerInfo::default();
502		peer_info.reputation = INITIAL_REPUTATION;
503
504		peer_info.decay_reputation(1);
505		assert!(peer_info.reputation <= 0);
506		assert!(peer_info.reputation > INITIAL_REPUTATION);
507	}
508
509	#[test]
510	fn decaying_max_reputation_finally_yields_zero() {
511		const INITIAL_REPUTATION: i32 = i32::MAX;
512		const SECONDS: u64 = 3544;
513
514		let mut peer_info = PeerInfo::default();
515		peer_info.reputation = INITIAL_REPUTATION;
516
517		peer_info.decay_reputation(SECONDS / 2);
518		assert!(peer_info.reputation > 0);
519
520		peer_info.decay_reputation(SECONDS / 2);
521		assert_eq!(peer_info.reputation, 0);
522	}
523
524	#[test]
525	fn decaying_min_reputation_finally_yields_zero() {
526		const INITIAL_REPUTATION: i32 = i32::MIN;
527		const SECONDS: u64 = 3544;
528
529		let mut peer_info = PeerInfo::default();
530		peer_info.reputation = INITIAL_REPUTATION;
531
532		peer_info.decay_reputation(SECONDS / 2);
533		assert!(peer_info.reputation < 0);
534
535		peer_info.decay_reputation(SECONDS / 2);
536		assert_eq!(peer_info.reputation, 0);
537	}
538
539	#[test]
540	fn report_banned_peers() {
541		let peer_a = sc_network_types::PeerId::random();
542		let peer_b = sc_network_types::PeerId::random();
543		let peer_c = sc_network_types::PeerId::random();
544
545		let metrics_registry = prometheus_endpoint::Registry::new();
546		let peerstore = PeerStore::new(
547			vec![peer_a, peer_b, peer_c].into_iter().map(Into::into).collect(),
548			Some(metrics_registry),
549		);
550		let metrics = peerstore.inner.lock().metrics.as_ref().unwrap().clone();
551		let handle = peerstore.handle();
552
553		// Check initial state. Advance time to propagate peers.
554		handle.inner.lock().progress_time(1);
555		assert_eq!(metrics.num_discovered.get(), 3);
556		assert_eq!(metrics.num_banned_peers.get(), 0);
557
558		// Report 2 peers with a negative reputation.
559		handle.report_peer(
560			peer_a,
561			sc_network_common::types::ReputationChange { value: i32::MIN, reason: "test".into() },
562		);
563		handle.report_peer(
564			peer_b,
565			sc_network_common::types::ReputationChange { value: i32::MIN, reason: "test".into() },
566		);
567
568		// Advance time to propagate banned peers.
569		handle.inner.lock().progress_time(1);
570		assert_eq!(metrics.num_discovered.get(), 3);
571		assert_eq!(metrics.num_banned_peers.get(), 2);
572	}
573}