1use crate::utils::interval;
23use either::Either;
24
25use fnv::FnvHashMap;
26use futures::prelude::*;
27use libp2p::{
28 core::{ConnectedPoint, Endpoint},
29 identify::{
30 Behaviour as Identify, Config as IdentifyConfig, Event as IdentifyEvent,
31 Info as IdentifyInfo,
32 },
33 identity::PublicKey,
34 ping::{Behaviour as Ping, Config as PingConfig, Event as PingEvent},
35 swarm::{
36 behaviour::{
37 AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure,
38 ExternalAddrConfirmed, FromSwarm, ListenFailure,
39 },
40 ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, ConnectionId,
41 NetworkBehaviour, NewExternalAddrCandidate, PollParameters, THandler, THandlerInEvent,
42 THandlerOutEvent, ToSwarm,
43 },
44 Multiaddr, PeerId,
45};
46use log::{debug, error, trace};
47use parking_lot::Mutex;
48use smallvec::SmallVec;
49
50use std::{
51 collections::{hash_map::Entry, HashSet, VecDeque},
52 pin::Pin,
53 sync::Arc,
54 task::{Context, Poll},
55 time::{Duration, Instant},
56};
57
58const CACHE_EXPIRE: Duration = Duration::from_secs(10 * 60);
60const GARBAGE_COLLECT_INTERVAL: Duration = Duration::from_secs(2 * 60);
62
63pub struct PeerInfoBehaviour {
65 ping: Ping,
67 identify: Identify,
69 nodes_info: FnvHashMap<PeerId, NodeInfo>,
71 garbage_collect: Pin<Box<dyn Stream<Item = ()> + Send>>,
73 external_addresses: ExternalAddresses,
75 pending_actions: VecDeque<ToSwarm<PeerInfoEvent, THandlerInEvent<PeerInfoBehaviour>>>,
77}
78
79#[derive(Debug)]
81struct NodeInfo {
82 info_expire: Option<Instant>,
85 endpoints: SmallVec<[ConnectedPoint; crate::MAX_CONNECTIONS_PER_PEER]>,
87 client_version: Option<String>,
89 latest_ping: Option<Duration>,
91}
92
93impl NodeInfo {
94 fn new(endpoint: ConnectedPoint) -> Self {
95 let mut endpoints = SmallVec::new();
96 endpoints.push(endpoint);
97 Self { info_expire: None, endpoints, client_version: None, latest_ping: None }
98 }
99}
100
101#[derive(Debug, Clone, Default)]
103pub struct ExternalAddresses {
104 addresses: Arc<Mutex<HashSet<Multiaddr>>>,
105}
106
107impl ExternalAddresses {
108 pub fn add(&mut self, addr: Multiaddr) {
110 self.addresses.lock().insert(addr);
111 }
112
113 pub fn remove(&mut self, addr: &Multiaddr) {
115 self.addresses.lock().remove(addr);
116 }
117}
118
119impl PeerInfoBehaviour {
120 pub fn new(
122 user_agent: String,
123 local_public_key: PublicKey,
124 external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
125 ) -> Self {
126 let identify = {
127 let cfg = IdentifyConfig::new("/substrate/1.0".to_string(), local_public_key)
128 .with_agent_version(user_agent)
129 .with_cache_size(0);
131 Identify::new(cfg)
132 };
133
134 Self {
135 ping: Ping::new(PingConfig::new()),
136 identify,
137 nodes_info: FnvHashMap::default(),
138 garbage_collect: Box::pin(interval(GARBAGE_COLLECT_INTERVAL)),
139 external_addresses: ExternalAddresses { addresses: external_addresses },
140 pending_actions: Default::default(),
141 }
142 }
143
144 pub fn node(&self, peer_id: &PeerId) -> Option<Node> {
150 self.nodes_info.get(peer_id).map(Node)
151 }
152
153 fn handle_ping_report(
156 &mut self,
157 peer_id: &PeerId,
158 ping_time: Duration,
159 connection: ConnectionId,
160 ) {
161 trace!(target: "sub-libp2p", "Ping time with {:?} via {:?}: {:?}", peer_id, connection, ping_time);
162 if let Some(entry) = self.nodes_info.get_mut(peer_id) {
163 entry.latest_ping = Some(ping_time);
164 } else {
165 error!(target: "sub-libp2p",
166 "Received ping from node we're not connected to {:?} via {:?}", peer_id, connection);
167 }
168 }
169
170 fn handle_identify_report(&mut self, peer_id: &PeerId, info: &IdentifyInfo) {
173 trace!(target: "sub-libp2p", "Identified {:?} => {:?}", peer_id, info);
174 if let Some(entry) = self.nodes_info.get_mut(peer_id) {
175 entry.client_version = Some(info.agent_version.clone());
176 } else {
177 error!(target: "sub-libp2p",
178 "Received pong from node we're not connected to {:?}", peer_id);
179 }
180 }
181}
182
183pub struct Node<'a>(&'a NodeInfo);
185
186impl<'a> Node<'a> {
187 pub fn endpoint(&self) -> Option<&'a ConnectedPoint> {
191 self.0.endpoints.get(0)
192 }
193
194 pub fn client_version(&self) -> Option<&'a str> {
196 self.0.client_version.as_deref()
197 }
198
199 pub fn latest_ping(&self) -> Option<Duration> {
202 self.0.latest_ping
203 }
204}
205
206#[derive(Debug)]
208pub enum PeerInfoEvent {
209 Identified {
212 peer_id: PeerId,
214 info: IdentifyInfo,
216 },
217}
218
219impl NetworkBehaviour for PeerInfoBehaviour {
220 type ConnectionHandler = ConnectionHandlerSelect<
221 <Ping as NetworkBehaviour>::ConnectionHandler,
222 <Identify as NetworkBehaviour>::ConnectionHandler,
223 >;
224 type ToSwarm = PeerInfoEvent;
225
226 fn handle_pending_inbound_connection(
227 &mut self,
228 connection_id: ConnectionId,
229 local_addr: &Multiaddr,
230 remote_addr: &Multiaddr,
231 ) -> Result<(), ConnectionDenied> {
232 self.ping
233 .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
234 self.identify
235 .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
236 }
237
238 fn handle_pending_outbound_connection(
239 &mut self,
240 _connection_id: ConnectionId,
241 _maybe_peer: Option<PeerId>,
242 _addresses: &[Multiaddr],
243 _effective_role: Endpoint,
244 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
245 Ok(Vec::new())
248 }
249
250 fn handle_established_inbound_connection(
251 &mut self,
252 connection_id: ConnectionId,
253 peer: PeerId,
254 local_addr: &Multiaddr,
255 remote_addr: &Multiaddr,
256 ) -> Result<THandler<Self>, ConnectionDenied> {
257 let ping_handler = self.ping.handle_established_inbound_connection(
258 connection_id,
259 peer,
260 local_addr,
261 remote_addr,
262 )?;
263 let identify_handler = self.identify.handle_established_inbound_connection(
264 connection_id,
265 peer,
266 local_addr,
267 remote_addr,
268 )?;
269 Ok(ping_handler.select(identify_handler))
270 }
271
272 fn handle_established_outbound_connection(
273 &mut self,
274 connection_id: ConnectionId,
275 peer: PeerId,
276 addr: &Multiaddr,
277 role_override: Endpoint,
278 ) -> Result<THandler<Self>, ConnectionDenied> {
279 let ping_handler = self.ping.handle_established_outbound_connection(
280 connection_id,
281 peer,
282 addr,
283 role_override,
284 )?;
285 let identify_handler = self.identify.handle_established_outbound_connection(
286 connection_id,
287 peer,
288 addr,
289 role_override,
290 )?;
291 Ok(ping_handler.select(identify_handler))
292 }
293
294 fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
295 match event {
296 FromSwarm::ConnectionEstablished(
297 e @ ConnectionEstablished { peer_id, endpoint, .. },
298 ) => {
299 self.ping.on_swarm_event(FromSwarm::ConnectionEstablished(e));
300 self.identify.on_swarm_event(FromSwarm::ConnectionEstablished(e));
301
302 match self.nodes_info.entry(peer_id) {
303 Entry::Vacant(e) => {
304 e.insert(NodeInfo::new(endpoint.clone()));
305 },
306 Entry::Occupied(e) => {
307 let e = e.into_mut();
308 if e.info_expire.as_ref().map(|exp| *exp < Instant::now()).unwrap_or(false)
309 {
310 e.client_version = None;
311 e.latest_ping = None;
312 }
313 e.info_expire = None;
314 e.endpoints.push(endpoint.clone());
315 },
316 }
317 },
318 FromSwarm::ConnectionClosed(ConnectionClosed {
319 peer_id,
320 connection_id,
321 endpoint,
322 handler,
323 remaining_established,
324 }) => {
325 let (ping_handler, identity_handler) = handler.into_inner();
326 self.ping.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
327 peer_id,
328 connection_id,
329 endpoint,
330 handler: ping_handler,
331 remaining_established,
332 }));
333 self.identify.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
334 peer_id,
335 connection_id,
336 endpoint,
337 handler: identity_handler,
338 remaining_established,
339 }));
340
341 if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
342 if remaining_established == 0 {
343 entry.info_expire = Some(Instant::now() + CACHE_EXPIRE);
344 }
345 entry.endpoints.retain(|ep| ep != endpoint)
346 } else {
347 error!(target: "sub-libp2p",
348 "Unknown connection to {:?} closed: {:?}", peer_id, endpoint);
349 }
350 },
351 FromSwarm::DialFailure(DialFailure { peer_id, error, connection_id }) => {
352 self.ping.on_swarm_event(FromSwarm::DialFailure(DialFailure {
353 peer_id,
354 error,
355 connection_id,
356 }));
357 self.identify.on_swarm_event(FromSwarm::DialFailure(DialFailure {
358 peer_id,
359 error,
360 connection_id,
361 }));
362 },
363 FromSwarm::ListenerClosed(e) => {
364 self.ping.on_swarm_event(FromSwarm::ListenerClosed(e));
365 self.identify.on_swarm_event(FromSwarm::ListenerClosed(e));
366 },
367 FromSwarm::ListenFailure(ListenFailure {
368 local_addr,
369 send_back_addr,
370 error,
371 connection_id,
372 }) => {
373 self.ping.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
374 local_addr,
375 send_back_addr,
376 error,
377 connection_id,
378 }));
379 self.identify.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
380 local_addr,
381 send_back_addr,
382 error,
383 connection_id,
384 }));
385 },
386 FromSwarm::ListenerError(e) => {
387 self.ping.on_swarm_event(FromSwarm::ListenerError(e));
388 self.identify.on_swarm_event(FromSwarm::ListenerError(e));
389 },
390 FromSwarm::ExternalAddrExpired(e) => {
391 self.ping.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
392 self.identify.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
393 },
394 FromSwarm::NewListener(e) => {
395 self.ping.on_swarm_event(FromSwarm::NewListener(e));
396 self.identify.on_swarm_event(FromSwarm::NewListener(e));
397 },
398 FromSwarm::ExpiredListenAddr(e) => {
399 self.ping.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
400 self.identify.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
401 self.external_addresses.remove(e.addr);
402 },
403 FromSwarm::NewExternalAddrCandidate(e @ NewExternalAddrCandidate { addr }) => {
404 self.ping.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
405 self.identify.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
406
407 self.pending_actions.push_back(ToSwarm::ExternalAddrConfirmed(addr.clone()));
415 },
416 FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => {
417 self.ping.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
418 self.identify.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
419 self.external_addresses.add(addr.clone());
420 },
421 FromSwarm::AddressChange(e @ AddressChange { peer_id, old, new, .. }) => {
422 self.ping.on_swarm_event(FromSwarm::AddressChange(e));
423 self.identify.on_swarm_event(FromSwarm::AddressChange(e));
424
425 if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
426 if let Some(endpoint) = entry.endpoints.iter_mut().find(|e| e == &old) {
427 *endpoint = new.clone();
428 } else {
429 error!(target: "sub-libp2p",
430 "Unknown address change for peer {:?} from {:?} to {:?}", peer_id, old, new);
431 }
432 } else {
433 error!(target: "sub-libp2p",
434 "Unknown peer {:?} to change address from {:?} to {:?}", peer_id, old, new);
435 }
436 },
437 FromSwarm::NewListenAddr(e) => {
438 self.ping.on_swarm_event(FromSwarm::NewListenAddr(e));
439 self.identify.on_swarm_event(FromSwarm::NewListenAddr(e));
440 },
441 }
442 }
443
444 fn on_connection_handler_event(
445 &mut self,
446 peer_id: PeerId,
447 connection_id: ConnectionId,
448 event: THandlerOutEvent<Self>,
449 ) {
450 match event {
451 Either::Left(event) =>
452 self.ping.on_connection_handler_event(peer_id, connection_id, event),
453 Either::Right(event) =>
454 self.identify.on_connection_handler_event(peer_id, connection_id, event),
455 }
456 }
457
458 fn poll(
459 &mut self,
460 cx: &mut Context,
461 params: &mut impl PollParameters,
462 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
463 if let Some(event) = self.pending_actions.pop_front() {
464 return Poll::Ready(event)
465 }
466
467 loop {
468 match self.ping.poll(cx, params) {
469 Poll::Pending => break,
470 Poll::Ready(ToSwarm::GenerateEvent(ev)) => {
471 if let PingEvent { peer, result: Ok(rtt), connection } = ev {
472 self.handle_ping_report(&peer, rtt, connection)
473 }
474 },
475 Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }),
476 Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }) =>
477 return Poll::Ready(ToSwarm::NotifyHandler {
478 peer_id,
479 handler,
480 event: Either::Left(event),
481 }),
482 Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) =>
483 return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
484 Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)) =>
485 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
486 Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) =>
487 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
488 Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) =>
489 return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
490 Poll::Ready(ToSwarm::ListenOn { opts }) =>
491 return Poll::Ready(ToSwarm::ListenOn { opts }),
492 Poll::Ready(ToSwarm::RemoveListener { id }) =>
493 return Poll::Ready(ToSwarm::RemoveListener { id }),
494 }
495 }
496
497 loop {
498 match self.identify.poll(cx, params) {
499 Poll::Pending => break,
500 Poll::Ready(ToSwarm::GenerateEvent(event)) => match event {
501 IdentifyEvent::Received { peer_id, info, .. } => {
502 self.handle_identify_report(&peer_id, &info);
503 let event = PeerInfoEvent::Identified { peer_id, info };
504 return Poll::Ready(ToSwarm::GenerateEvent(event))
505 },
506 IdentifyEvent::Error { peer_id, error } => {
507 debug!(target: "sub-libp2p", "Identification with peer {:?} failed => {}", peer_id, error)
508 },
509 IdentifyEvent::Pushed { .. } => {},
510 IdentifyEvent::Sent { .. } => {},
511 },
512 Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }),
513 Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }) =>
514 return Poll::Ready(ToSwarm::NotifyHandler {
515 peer_id,
516 handler,
517 event: Either::Right(event),
518 }),
519 Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) =>
520 return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
521 Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)) =>
522 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
523 Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) =>
524 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
525 Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) =>
526 return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
527 Poll::Ready(ToSwarm::ListenOn { opts }) =>
528 return Poll::Ready(ToSwarm::ListenOn { opts }),
529 Poll::Ready(ToSwarm::RemoveListener { id }) =>
530 return Poll::Ready(ToSwarm::RemoveListener { id }),
531 }
532 }
533
534 while let Poll::Ready(Some(())) = self.garbage_collect.poll_next_unpin(cx) {
535 self.nodes_info.retain(|_, node| {
536 node.info_expire.as_ref().map(|exp| *exp >= Instant::now()).unwrap_or(true)
537 });
538 }
539
540 Poll::Pending
541 }
542}