sc_network/
peer_info.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! [`PeerInfoBehaviour`] is implementation of `NetworkBehaviour` that holds information about peers
20//! in cache.
21
22use crate::utils::interval;
23use either::Either;
24
25use fnv::FnvHashMap;
26use futures::prelude::*;
27use libp2p::{
28	core::{ConnectedPoint, Endpoint},
29	identify::{
30		Behaviour as Identify, Config as IdentifyConfig, Event as IdentifyEvent,
31		Info as IdentifyInfo,
32	},
33	identity::PublicKey,
34	ping::{Behaviour as Ping, Config as PingConfig, Event as PingEvent},
35	swarm::{
36		behaviour::{
37			AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure,
38			ExternalAddrConfirmed, FromSwarm, ListenFailure,
39		},
40		ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, ConnectionId,
41		NetworkBehaviour, NewExternalAddrCandidate, PollParameters, THandler, THandlerInEvent,
42		THandlerOutEvent, ToSwarm,
43	},
44	Multiaddr, PeerId,
45};
46use log::{debug, error, trace};
47use parking_lot::Mutex;
48use smallvec::SmallVec;
49
50use std::{
51	collections::{hash_map::Entry, HashSet, VecDeque},
52	pin::Pin,
53	sync::Arc,
54	task::{Context, Poll},
55	time::{Duration, Instant},
56};
57
58/// Time after we disconnect from a node before we purge its information from the cache.
59const CACHE_EXPIRE: Duration = Duration::from_secs(10 * 60);
60/// Interval at which we perform garbage collection on the node info.
61const GARBAGE_COLLECT_INTERVAL: Duration = Duration::from_secs(2 * 60);
62
63/// Implementation of `NetworkBehaviour` that holds information about peers in cache.
64pub struct PeerInfoBehaviour {
65	/// Periodically ping nodes, and close the connection if it's unresponsive.
66	ping: Ping,
67	/// Periodically identifies the remote and responds to incoming requests.
68	identify: Identify,
69	/// Information that we know about all nodes.
70	nodes_info: FnvHashMap<PeerId, NodeInfo>,
71	/// Interval at which we perform garbage collection in `nodes_info`.
72	garbage_collect: Pin<Box<dyn Stream<Item = ()> + Send>>,
73	/// Record keeping of external addresses. Data is queried by the `NetworkService`.
74	external_addresses: ExternalAddresses,
75	/// Pending events to emit to [`Swarm`](libp2p::swarm::Swarm).
76	pending_actions: VecDeque<ToSwarm<PeerInfoEvent, THandlerInEvent<PeerInfoBehaviour>>>,
77}
78
79/// Information about a node we're connected to.
80#[derive(Debug)]
81struct NodeInfo {
82	/// When we will remove the entry about this node from the list, or `None` if we're connected
83	/// to the node.
84	info_expire: Option<Instant>,
85	/// Non-empty list of connected endpoints, one per connection.
86	endpoints: SmallVec<[ConnectedPoint; crate::MAX_CONNECTIONS_PER_PEER]>,
87	/// Version reported by the remote, or `None` if unknown.
88	client_version: Option<String>,
89	/// Latest ping time with this node.
90	latest_ping: Option<Duration>,
91}
92
93impl NodeInfo {
94	fn new(endpoint: ConnectedPoint) -> Self {
95		let mut endpoints = SmallVec::new();
96		endpoints.push(endpoint);
97		Self { info_expire: None, endpoints, client_version: None, latest_ping: None }
98	}
99}
100
101/// Utility struct for tracking external addresses. The data is shared with the `NetworkService`.
102#[derive(Debug, Clone, Default)]
103pub struct ExternalAddresses {
104	addresses: Arc<Mutex<HashSet<Multiaddr>>>,
105}
106
107impl ExternalAddresses {
108	/// Add an external address.
109	pub fn add(&mut self, addr: Multiaddr) {
110		self.addresses.lock().insert(addr);
111	}
112
113	/// Remove an external address.
114	pub fn remove(&mut self, addr: &Multiaddr) {
115		self.addresses.lock().remove(addr);
116	}
117}
118
119impl PeerInfoBehaviour {
120	/// Builds a new `PeerInfoBehaviour`.
121	pub fn new(
122		user_agent: String,
123		local_public_key: PublicKey,
124		external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
125	) -> Self {
126		let identify = {
127			let cfg = IdentifyConfig::new("/substrate/1.0".to_string(), local_public_key)
128				.with_agent_version(user_agent)
129				// We don't need any peer information cached.
130				.with_cache_size(0);
131			Identify::new(cfg)
132		};
133
134		Self {
135			ping: Ping::new(PingConfig::new()),
136			identify,
137			nodes_info: FnvHashMap::default(),
138			garbage_collect: Box::pin(interval(GARBAGE_COLLECT_INTERVAL)),
139			external_addresses: ExternalAddresses { addresses: external_addresses },
140			pending_actions: Default::default(),
141		}
142	}
143
144	/// Borrows `self` and returns a struct giving access to the information about a node.
145	///
146	/// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes
147	/// we're connected to, meaning that if `None` is returned then we're not connected to that
148	/// node.
149	pub fn node(&self, peer_id: &PeerId) -> Option<Node> {
150		self.nodes_info.get(peer_id).map(Node)
151	}
152
153	/// Inserts a ping time in the cache. Has no effect if we don't have any entry for that node,
154	/// which shouldn't happen.
155	fn handle_ping_report(
156		&mut self,
157		peer_id: &PeerId,
158		ping_time: Duration,
159		connection: ConnectionId,
160	) {
161		trace!(target: "sub-libp2p", "Ping time with {:?} via {:?}: {:?}", peer_id, connection, ping_time);
162		if let Some(entry) = self.nodes_info.get_mut(peer_id) {
163			entry.latest_ping = Some(ping_time);
164		} else {
165			error!(target: "sub-libp2p",
166				"Received ping from node we're not connected to {:?} via {:?}", peer_id, connection);
167		}
168	}
169
170	/// Inserts an identify record in the cache. Has no effect if we don't have any entry for that
171	/// node, which shouldn't happen.
172	fn handle_identify_report(&mut self, peer_id: &PeerId, info: &IdentifyInfo) {
173		trace!(target: "sub-libp2p", "Identified {:?} => {:?}", peer_id, info);
174		if let Some(entry) = self.nodes_info.get_mut(peer_id) {
175			entry.client_version = Some(info.agent_version.clone());
176		} else {
177			error!(target: "sub-libp2p",
178				"Received pong from node we're not connected to {:?}", peer_id);
179		}
180	}
181}
182
183/// Gives access to the information about a node.
184pub struct Node<'a>(&'a NodeInfo);
185
186impl<'a> Node<'a> {
187	/// Returns the endpoint of an established connection to the peer.
188	///
189	/// Returns `None` if we are disconnected from the node.
190	pub fn endpoint(&self) -> Option<&'a ConnectedPoint> {
191		self.0.endpoints.get(0)
192	}
193
194	/// Returns the latest version information we know of.
195	pub fn client_version(&self) -> Option<&'a str> {
196		self.0.client_version.as_deref()
197	}
198
199	/// Returns the latest ping time we know of for this node. `None` if we never successfully
200	/// pinged this node.
201	pub fn latest_ping(&self) -> Option<Duration> {
202		self.0.latest_ping
203	}
204}
205
206/// Event that can be emitted by the behaviour.
207#[derive(Debug)]
208pub enum PeerInfoEvent {
209	/// We have obtained identity information from a peer, including the addresses it is listening
210	/// on.
211	Identified {
212		/// Id of the peer that has been identified.
213		peer_id: PeerId,
214		/// Information about the peer.
215		info: IdentifyInfo,
216	},
217}
218
219impl NetworkBehaviour for PeerInfoBehaviour {
220	type ConnectionHandler = ConnectionHandlerSelect<
221		<Ping as NetworkBehaviour>::ConnectionHandler,
222		<Identify as NetworkBehaviour>::ConnectionHandler,
223	>;
224	type ToSwarm = PeerInfoEvent;
225
226	fn handle_pending_inbound_connection(
227		&mut self,
228		connection_id: ConnectionId,
229		local_addr: &Multiaddr,
230		remote_addr: &Multiaddr,
231	) -> Result<(), ConnectionDenied> {
232		self.ping
233			.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
234		self.identify
235			.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
236	}
237
238	fn handle_pending_outbound_connection(
239		&mut self,
240		_connection_id: ConnectionId,
241		_maybe_peer: Option<PeerId>,
242		_addresses: &[Multiaddr],
243		_effective_role: Endpoint,
244	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
245		// Only `Discovery::handle_pending_outbound_connection` must be returning addresses to
246		// ensure that we don't return unwanted addresses.
247		Ok(Vec::new())
248	}
249
250	fn handle_established_inbound_connection(
251		&mut self,
252		connection_id: ConnectionId,
253		peer: PeerId,
254		local_addr: &Multiaddr,
255		remote_addr: &Multiaddr,
256	) -> Result<THandler<Self>, ConnectionDenied> {
257		let ping_handler = self.ping.handle_established_inbound_connection(
258			connection_id,
259			peer,
260			local_addr,
261			remote_addr,
262		)?;
263		let identify_handler = self.identify.handle_established_inbound_connection(
264			connection_id,
265			peer,
266			local_addr,
267			remote_addr,
268		)?;
269		Ok(ping_handler.select(identify_handler))
270	}
271
272	fn handle_established_outbound_connection(
273		&mut self,
274		connection_id: ConnectionId,
275		peer: PeerId,
276		addr: &Multiaddr,
277		role_override: Endpoint,
278	) -> Result<THandler<Self>, ConnectionDenied> {
279		let ping_handler = self.ping.handle_established_outbound_connection(
280			connection_id,
281			peer,
282			addr,
283			role_override,
284		)?;
285		let identify_handler = self.identify.handle_established_outbound_connection(
286			connection_id,
287			peer,
288			addr,
289			role_override,
290		)?;
291		Ok(ping_handler.select(identify_handler))
292	}
293
294	fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
295		match event {
296			FromSwarm::ConnectionEstablished(
297				e @ ConnectionEstablished { peer_id, endpoint, .. },
298			) => {
299				self.ping.on_swarm_event(FromSwarm::ConnectionEstablished(e));
300				self.identify.on_swarm_event(FromSwarm::ConnectionEstablished(e));
301
302				match self.nodes_info.entry(peer_id) {
303					Entry::Vacant(e) => {
304						e.insert(NodeInfo::new(endpoint.clone()));
305					},
306					Entry::Occupied(e) => {
307						let e = e.into_mut();
308						if e.info_expire.as_ref().map(|exp| *exp < Instant::now()).unwrap_or(false)
309						{
310							e.client_version = None;
311							e.latest_ping = None;
312						}
313						e.info_expire = None;
314						e.endpoints.push(endpoint.clone());
315					},
316				}
317			},
318			FromSwarm::ConnectionClosed(ConnectionClosed {
319				peer_id,
320				connection_id,
321				endpoint,
322				handler,
323				remaining_established,
324			}) => {
325				let (ping_handler, identity_handler) = handler.into_inner();
326				self.ping.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
327					peer_id,
328					connection_id,
329					endpoint,
330					handler: ping_handler,
331					remaining_established,
332				}));
333				self.identify.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
334					peer_id,
335					connection_id,
336					endpoint,
337					handler: identity_handler,
338					remaining_established,
339				}));
340
341				if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
342					if remaining_established == 0 {
343						entry.info_expire = Some(Instant::now() + CACHE_EXPIRE);
344					}
345					entry.endpoints.retain(|ep| ep != endpoint)
346				} else {
347					error!(target: "sub-libp2p",
348						"Unknown connection to {:?} closed: {:?}", peer_id, endpoint);
349				}
350			},
351			FromSwarm::DialFailure(DialFailure { peer_id, error, connection_id }) => {
352				self.ping.on_swarm_event(FromSwarm::DialFailure(DialFailure {
353					peer_id,
354					error,
355					connection_id,
356				}));
357				self.identify.on_swarm_event(FromSwarm::DialFailure(DialFailure {
358					peer_id,
359					error,
360					connection_id,
361				}));
362			},
363			FromSwarm::ListenerClosed(e) => {
364				self.ping.on_swarm_event(FromSwarm::ListenerClosed(e));
365				self.identify.on_swarm_event(FromSwarm::ListenerClosed(e));
366			},
367			FromSwarm::ListenFailure(ListenFailure {
368				local_addr,
369				send_back_addr,
370				error,
371				connection_id,
372			}) => {
373				self.ping.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
374					local_addr,
375					send_back_addr,
376					error,
377					connection_id,
378				}));
379				self.identify.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
380					local_addr,
381					send_back_addr,
382					error,
383					connection_id,
384				}));
385			},
386			FromSwarm::ListenerError(e) => {
387				self.ping.on_swarm_event(FromSwarm::ListenerError(e));
388				self.identify.on_swarm_event(FromSwarm::ListenerError(e));
389			},
390			FromSwarm::ExternalAddrExpired(e) => {
391				self.ping.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
392				self.identify.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
393			},
394			FromSwarm::NewListener(e) => {
395				self.ping.on_swarm_event(FromSwarm::NewListener(e));
396				self.identify.on_swarm_event(FromSwarm::NewListener(e));
397			},
398			FromSwarm::ExpiredListenAddr(e) => {
399				self.ping.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
400				self.identify.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
401				self.external_addresses.remove(e.addr);
402			},
403			FromSwarm::NewExternalAddrCandidate(e @ NewExternalAddrCandidate { addr }) => {
404				self.ping.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
405				self.identify.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
406
407				// Manually confirm all external address candidates.
408				// TODO: consider adding [AutoNAT protocol](https://docs.rs/libp2p/0.52.3/libp2p/autonat/index.html)
409				// (must go through the polkadot protocol spec) or implemeting heuristics for
410				// approving external address candidates. This can be done, for example, by
411				// approving only addresses reported by multiple peers.
412				// See also https://github.com/libp2p/rust-libp2p/pull/4721 introduced
413				// in libp2p v0.53 for heuristics approach.
414				self.pending_actions.push_back(ToSwarm::ExternalAddrConfirmed(addr.clone()));
415			},
416			FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => {
417				self.ping.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
418				self.identify.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
419				self.external_addresses.add(addr.clone());
420			},
421			FromSwarm::AddressChange(e @ AddressChange { peer_id, old, new, .. }) => {
422				self.ping.on_swarm_event(FromSwarm::AddressChange(e));
423				self.identify.on_swarm_event(FromSwarm::AddressChange(e));
424
425				if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
426					if let Some(endpoint) = entry.endpoints.iter_mut().find(|e| e == &old) {
427						*endpoint = new.clone();
428					} else {
429						error!(target: "sub-libp2p",
430							"Unknown address change for peer {:?} from {:?} to {:?}", peer_id, old, new);
431					}
432				} else {
433					error!(target: "sub-libp2p",
434						"Unknown peer {:?} to change address from {:?} to {:?}", peer_id, old, new);
435				}
436			},
437			FromSwarm::NewListenAddr(e) => {
438				self.ping.on_swarm_event(FromSwarm::NewListenAddr(e));
439				self.identify.on_swarm_event(FromSwarm::NewListenAddr(e));
440			},
441		}
442	}
443
444	fn on_connection_handler_event(
445		&mut self,
446		peer_id: PeerId,
447		connection_id: ConnectionId,
448		event: THandlerOutEvent<Self>,
449	) {
450		match event {
451			Either::Left(event) =>
452				self.ping.on_connection_handler_event(peer_id, connection_id, event),
453			Either::Right(event) =>
454				self.identify.on_connection_handler_event(peer_id, connection_id, event),
455		}
456	}
457
458	fn poll(
459		&mut self,
460		cx: &mut Context,
461		params: &mut impl PollParameters,
462	) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
463		if let Some(event) = self.pending_actions.pop_front() {
464			return Poll::Ready(event)
465		}
466
467		loop {
468			match self.ping.poll(cx, params) {
469				Poll::Pending => break,
470				Poll::Ready(ToSwarm::GenerateEvent(ev)) => {
471					if let PingEvent { peer, result: Ok(rtt), connection } = ev {
472						self.handle_ping_report(&peer, rtt, connection)
473					}
474				},
475				Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }),
476				Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }) =>
477					return Poll::Ready(ToSwarm::NotifyHandler {
478						peer_id,
479						handler,
480						event: Either::Left(event),
481					}),
482				Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) =>
483					return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
484				Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)) =>
485					return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
486				Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) =>
487					return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
488				Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) =>
489					return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
490				Poll::Ready(ToSwarm::ListenOn { opts }) =>
491					return Poll::Ready(ToSwarm::ListenOn { opts }),
492				Poll::Ready(ToSwarm::RemoveListener { id }) =>
493					return Poll::Ready(ToSwarm::RemoveListener { id }),
494			}
495		}
496
497		loop {
498			match self.identify.poll(cx, params) {
499				Poll::Pending => break,
500				Poll::Ready(ToSwarm::GenerateEvent(event)) => match event {
501					IdentifyEvent::Received { peer_id, info, .. } => {
502						self.handle_identify_report(&peer_id, &info);
503						let event = PeerInfoEvent::Identified { peer_id, info };
504						return Poll::Ready(ToSwarm::GenerateEvent(event))
505					},
506					IdentifyEvent::Error { peer_id, error } => {
507						debug!(target: "sub-libp2p", "Identification with peer {:?} failed => {}", peer_id, error)
508					},
509					IdentifyEvent::Pushed { .. } => {},
510					IdentifyEvent::Sent { .. } => {},
511				},
512				Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }),
513				Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }) =>
514					return Poll::Ready(ToSwarm::NotifyHandler {
515						peer_id,
516						handler,
517						event: Either::Right(event),
518					}),
519				Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) =>
520					return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
521				Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)) =>
522					return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
523				Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) =>
524					return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
525				Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) =>
526					return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
527				Poll::Ready(ToSwarm::ListenOn { opts }) =>
528					return Poll::Ready(ToSwarm::ListenOn { opts }),
529				Poll::Ready(ToSwarm::RemoveListener { id }) =>
530					return Poll::Ready(ToSwarm::RemoveListener { id }),
531			}
532		}
533
534		while let Poll::Ready(Some(())) = self.garbage_collect.poll_next_unpin(cx) {
535			self.nodes_info.retain(|_, node| {
536				node.info_expire.as_ref().map(|exp| *exp >= Instant::now()).unwrap_or(true)
537			});
538		}
539
540		Poll::Pending
541	}
542}