1use crate::{
22 config::{
23 FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24 SetConfig, TransportConfig,
25 },
26 error::Error,
27 event::{DhtEvent, Event},
28 litep2p::{
29 discovery::{Discovery, DiscoveryEvent},
30 peerstore::Peerstore,
31 service::{Litep2pNetworkService, NetworkServiceCommand},
32 shim::{
33 bitswap::BitswapServer,
34 notification::{
35 config::{NotificationProtocolConfig, ProtocolControlHandle},
36 peerset::PeersetCommand,
37 },
38 request_response::{RequestResponseConfig, RequestResponseProtocol},
39 },
40 },
41 peer_store::PeerStoreProvider,
42 service::{
43 metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
44 out_events,
45 traits::{BandwidthSink, NetworkBackend, NetworkService},
46 },
47 NetworkStatus, NotificationService, ProtocolName,
48};
49
50use codec::Encode;
51use futures::StreamExt;
52use libp2p::kad::{PeerRecord, Record as P2PRecord, RecordKey};
53use litep2p::{
54 config::ConfigBuilder,
55 crypto::ed25519::Keypair,
56 error::{DialError, NegotiationError},
57 executor::Executor,
58 protocol::{
59 libp2p::{
60 bitswap::Config as BitswapConfig,
61 kademlia::{QueryId, Record},
62 },
63 request_response::ConfigBuilder as RequestResponseConfigBuilder,
64 },
65 transport::{
66 tcp::config::Config as TcpTransportConfig,
67 websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
68 },
69 types::{
70 multiaddr::{Multiaddr, Protocol},
71 ConnectionId,
72 },
73 Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
74};
75use prometheus_endpoint::Registry;
76
77use sc_client_api::BlockBackend;
78use sc_network_common::{role::Roles, ExHashT};
79use sc_network_types::PeerId;
80use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
81use sp_runtime::traits::Block as BlockT;
82
83use std::{
84 cmp,
85 collections::{hash_map::Entry, HashMap, HashSet},
86 fs,
87 future::Future,
88 iter,
89 pin::Pin,
90 sync::{
91 atomic::{AtomicUsize, Ordering},
92 Arc,
93 },
94 time::{Duration, Instant},
95};
96
97mod discovery;
98mod peerstore;
99mod service;
100mod shim;
101
102const KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10);
104
105struct Litep2pBandwidthSink {
107 sink: litep2p::BandwidthSink,
108}
109
110impl BandwidthSink for Litep2pBandwidthSink {
111 fn total_inbound(&self) -> u64 {
112 self.sink.inbound() as u64
113 }
114
115 fn total_outbound(&self) -> u64 {
116 self.sink.outbound() as u64
117 }
118}
119
120struct Litep2pExecutor {
122 executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
124}
125
126impl Executor for Litep2pExecutor {
127 fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
128 (self.executor)(future)
129 }
130
131 fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
132 (self.executor)(future)
133 }
134}
135
136const LOG_TARGET: &str = "sub-libp2p";
138
139struct ConnectionContext {
141 endpoints: HashMap<ConnectionId, Endpoint>,
143
144 num_connections: usize,
146}
147
148pub struct Litep2pNetworkBackend {
150 litep2p: Litep2p,
152
153 network_service: Arc<dyn NetworkService>,
155
156 cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
158
159 peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
161
162 pending_get_values: HashMap<QueryId, (RecordKey, Instant)>,
164
165 pending_put_values: HashMap<QueryId, (RecordKey, Instant)>,
167
168 discovery: Discovery,
170
171 num_connected: Arc<AtomicUsize>,
173
174 peers: HashMap<litep2p::PeerId, ConnectionContext>,
176
177 peerstore_handle: Arc<dyn PeerStoreProvider>,
179
180 block_announce_protocol: ProtocolName,
182
183 event_streams: out_events::OutChannels,
185
186 metrics: Option<Metrics>,
188}
189
190impl Litep2pNetworkBackend {
191 fn parse_addresses(
194 addresses: impl Iterator<Item = Multiaddr>,
195 ) -> HashMap<PeerId, Vec<Multiaddr>> {
196 addresses
197 .into_iter()
198 .filter_map(|address| match address.iter().next() {
199 Some(
200 Protocol::Dns(_) |
201 Protocol::Dns4(_) |
202 Protocol::Dns6(_) |
203 Protocol::Ip6(_) |
204 Protocol::Ip4(_),
205 ) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
206 {
207 Some(Protocol::P2p(multihash)) => PeerId::from_multihash(multihash.into())
208 .map_or(None, |peer| Some((peer, Some(address)))),
209 _ => None,
210 },
211 Some(Protocol::P2p(multihash)) =>
212 PeerId::from_multihash(multihash.into()).map_or(None, |peer| Some((peer, None))),
213 _ => None,
214 })
215 .fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
216 let entry = acc.entry(peer).or_default();
217 maybe_address.map(|address| entry.push(address));
218
219 acc
220 })
221 }
222
223 fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
225 Self::parse_addresses(peers.into_iter())
226 .into_iter()
227 .filter_map(|(peer, addresses)| {
228 if addresses.is_empty() {
230 return Some(peer)
231 }
232
233 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
234 log::warn!(
235 target: LOG_TARGET,
236 "couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
237 );
238 return None
239 }
240
241 self.peerstore_handle.add_known_peer(peer);
242 Some(peer)
243 })
244 .collect()
245 }
246}
247
248impl Litep2pNetworkBackend {
249 fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
251 let secret: litep2p::crypto::ed25519::SecretKey =
252 node_key.clone().into_keypair()?.secret().into();
253
254 let local_identity = Keypair::from(secret);
255 let local_public = local_identity.public();
256 let local_peer_id = local_public.to_peer_id();
257
258 Ok((local_identity, local_peer_id))
259 }
260
261 fn configure_transport<B: BlockT + 'static, H: ExHashT>(
263 config: &FullNetworkConfiguration<B, H, Self>,
264 ) -> ConfigBuilder {
265 let _ = match config.network_config.transport {
266 TransportConfig::MemoryOnly => panic!("memory transport not supported"),
267 TransportConfig::Normal { .. } => false,
268 };
269 let config_builder = ConfigBuilder::new();
270
271 let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
272 .network_config
273 .listen_addresses
274 .iter()
275 .filter_map(|address| {
276 use sc_network_types::multiaddr::Protocol;
277
278 let mut iter = address.iter();
279
280 match iter.next() {
281 Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
282 protocol => {
283 log::error!(
284 target: LOG_TARGET,
285 "unknown protocol {protocol:?}, ignoring {address:?}",
286 );
287
288 return None
289 },
290 }
291
292 match iter.next() {
293 Some(Protocol::Tcp(_)) => match iter.next() {
294 Some(Protocol::Ws(_) | Protocol::Wss(_)) =>
295 Some((None, Some(address.clone()))),
296 Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
297 protocol => {
298 log::error!(
299 target: LOG_TARGET,
300 "unknown protocol {protocol:?}, ignoring {address:?}",
301 );
302 None
303 },
304 },
305 protocol => {
306 log::error!(
307 target: LOG_TARGET,
308 "unknown protocol {protocol:?}, ignoring {address:?}",
309 );
310 None
311 },
312 }
313 })
314 .unzip();
315
316 config_builder
317 .with_websocket(WebSocketTransportConfig {
318 listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
319 yamux_config: litep2p::yamux::Config::default(),
320 nodelay: true,
321 ..Default::default()
322 })
323 .with_tcp(TcpTransportConfig {
324 listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
325 yamux_config: litep2p::yamux::Config::default(),
326 nodelay: true,
327 ..Default::default()
328 })
329 }
330}
331
332#[async_trait::async_trait]
333impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
334 type NotificationProtocolConfig = NotificationProtocolConfig;
335 type RequestResponseProtocolConfig = RequestResponseConfig;
336 type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
337 type PeerStore = Peerstore;
338 type BitswapConfig = BitswapConfig;
339
340 fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
341 where
342 Self: Sized,
343 {
344 let (keypair, local_peer_id) =
345 Self::get_keypair(¶ms.network_config.network_config.node_key)?;
346 let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
347
348 params.network_config.network_config.boot_nodes = params
349 .network_config
350 .network_config
351 .boot_nodes
352 .into_iter()
353 .filter(|boot_node| boot_node.peer_id != local_peer_id.into())
354 .collect();
355 params.network_config.network_config.default_peers_set.reserved_nodes = params
356 .network_config
357 .network_config
358 .default_peers_set
359 .reserved_nodes
360 .into_iter()
361 .filter(|reserved_node| {
362 if reserved_node.peer_id == local_peer_id.into() {
363 log::warn!(
364 target: LOG_TARGET,
365 "Local peer ID used in reserved node, ignoring: {reserved_node}",
366 );
367 false
368 } else {
369 true
370 }
371 })
372 .collect();
373
374 if let Some(path) = ¶ms.network_config.network_config.net_config_path {
375 fs::create_dir_all(path)?;
376 }
377
378 log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
379 log::info!(target: LOG_TARGET, "Running litep2p network backend");
380
381 params.network_config.sanity_check_addresses()?;
382 params.network_config.sanity_check_bootnodes()?;
383
384 let mut config_builder =
385 Self::configure_transport(¶ms.network_config).with_keypair(keypair.clone());
386 let known_addresses = params.network_config.known_addresses();
387 let peer_store_handle = params.network_config.peer_store_handle();
388 let executor = Arc::new(Litep2pExecutor { executor: params.executor });
389
390 let FullNetworkConfiguration {
391 notification_protocols,
392 request_response_protocols,
393 network_config,
394 ..
395 } = params.network_config;
396
397 let block_announce_protocol = params.block_announce_config.protocol_name().clone();
403 let mut notif_protocols = HashMap::from_iter([(
404 params.block_announce_config.protocol_name().clone(),
405 params.block_announce_config.handle,
406 )]);
407
408 config_builder = notification_protocols
410 .into_iter()
411 .fold(config_builder, |config_builder, mut config| {
412 config.config.set_handshake(Roles::from(¶ms.role).encode());
413 notif_protocols.insert(config.protocol_name, config.handle);
414
415 config_builder.with_notification_protocol(config.config)
416 })
417 .with_notification_protocol(params.block_announce_config.config);
418
419 let metrics = match ¶ms.metrics_registry {
421 Some(registry) => Some(register_without_sources(registry)?),
422 None => None,
423 };
424
425 let (mut request_response_receivers, request_response_senders): (
431 HashMap<_, _>,
432 HashMap<_, _>,
433 ) = request_response_protocols
434 .iter()
435 .map(|config| {
436 let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
437 ((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
438 })
439 .unzip();
440
441 config_builder = request_response_protocols.into_iter().fold(
442 config_builder,
443 |config_builder, config| {
444 let (protocol_config, handle) = RequestResponseConfigBuilder::new(
445 Litep2pProtocolName::from(config.protocol_name.clone()),
446 )
447 .with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
448 .with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
449 .with_timeout(config.request_timeout)
450 .build();
451
452 let protocol = RequestResponseProtocol::new(
453 config.protocol_name.clone(),
454 handle,
455 Arc::clone(&peer_store_handle),
456 config.inbound_queue,
457 request_response_receivers
458 .remove(&config.protocol_name)
459 .expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
460 request_response_senders.clone(),
461 metrics.clone(),
462 );
463
464 executor.run(Box::pin(async move {
465 protocol.run().await;
466 }));
467
468 config_builder.with_request_response_protocol(protocol_config)
469 },
470 );
471
472 let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
474 known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
475 use sc_network_types::multiaddr::Protocol;
476
477 let address = match address.iter().last() {
478 Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) =>
479 address.with(Protocol::P2p(peer.into())),
480 Some(Protocol::P2p(_)) => address,
481 _ => return acc,
482 };
483
484 acc.entry(peer.into()).or_default().push(address.into());
485 peer_store_handle.add_known_peer(peer);
486
487 acc
488 });
489
490 let listen_addresses = Arc::new(Default::default());
492 let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
493 Discovery::new(
494 local_peer_id,
495 &network_config,
496 params.genesis_hash,
497 params.fork_id.as_deref(),
498 ¶ms.protocol_id,
499 known_addresses.clone(),
500 Arc::clone(&listen_addresses),
501 Arc::clone(&peer_store_handle),
502 );
503
504 config_builder = config_builder
505 .with_known_addresses(known_addresses.clone().into_iter())
506 .with_libp2p_ping(ping_config)
507 .with_libp2p_identify(identify_config)
508 .with_libp2p_kademlia(kademlia_config)
509 .with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
510 Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
511 ))
512 .with_keep_alive_timeout(KEEP_ALIVE_TIMEOUT)
515 .with_executor(executor);
516
517 if let Some(config) = maybe_mdns_config {
518 config_builder = config_builder.with_mdns(config);
519 }
520
521 if let Some(config) = params.bitswap_config {
522 config_builder = config_builder.with_libp2p_bitswap(config);
523 }
524
525 let litep2p =
526 Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
527
528 litep2p.listen_addresses().for_each(|address| {
529 log::debug!(target: LOG_TARGET, "listening on: {address}");
530
531 listen_addresses.write().insert(address.clone());
532 });
533
534 let public_addresses = litep2p.public_addresses();
535 for address in network_config.public_addresses.iter() {
536 if let Err(err) = public_addresses.add_address(address.clone().into()) {
537 log::warn!(
538 target: LOG_TARGET,
539 "failed to add public address {address:?}: {err:?}",
540 );
541 }
542 }
543
544 let network_service = Arc::new(Litep2pNetworkService::new(
545 local_peer_id,
546 keypair.clone(),
547 cmd_tx,
548 Arc::clone(&peer_store_handle),
549 notif_protocols.clone(),
550 block_announce_protocol.clone(),
551 request_response_senders,
552 Arc::clone(&listen_addresses),
553 public_addresses,
554 ));
555
556 let num_connected = Arc::new(Default::default());
558 let bandwidth: Arc<dyn BandwidthSink> =
559 Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
560
561 if let Some(registry) = ¶ms.metrics_registry {
562 MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
563 }
564
565 Ok(Self {
566 network_service,
567 cmd_rx,
568 metrics,
569 peerset_handles: notif_protocols,
570 num_connected,
571 discovery,
572 pending_put_values: HashMap::new(),
573 pending_get_values: HashMap::new(),
574 peerstore_handle: peer_store_handle,
575 block_announce_protocol,
576 event_streams: out_events::OutChannels::new(None)?,
577 peers: HashMap::new(),
578 litep2p,
579 })
580 }
581
582 fn network_service(&self) -> Arc<dyn NetworkService> {
583 Arc::clone(&self.network_service)
584 }
585
586 fn peer_store(
587 bootnodes: Vec<sc_network_types::PeerId>,
588 metrics_registry: Option<Registry>,
589 ) -> Self::PeerStore {
590 Peerstore::new(bootnodes, metrics_registry)
591 }
592
593 fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
594 NotificationMetrics::new(registry)
595 }
596
597 fn bitswap_server(
599 client: Arc<dyn BlockBackend<B> + Send + Sync>,
600 ) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
601 BitswapServer::new(client)
602 }
603
604 fn notification_config(
606 protocol_name: ProtocolName,
607 fallback_names: Vec<ProtocolName>,
608 max_notification_size: u64,
609 handshake: Option<NotificationHandshake>,
610 set_config: SetConfig,
611 metrics: NotificationMetrics,
612 peerstore_handle: Arc<dyn PeerStoreProvider>,
613 ) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
614 Self::NotificationProtocolConfig::new(
615 protocol_name,
616 fallback_names,
617 max_notification_size as usize,
618 handshake,
619 set_config,
620 metrics,
621 peerstore_handle,
622 )
623 }
624
625 fn request_response_config(
627 protocol_name: ProtocolName,
628 fallback_names: Vec<ProtocolName>,
629 max_request_size: u64,
630 max_response_size: u64,
631 request_timeout: Duration,
632 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
633 ) -> Self::RequestResponseProtocolConfig {
634 Self::RequestResponseProtocolConfig::new(
635 protocol_name,
636 fallback_names,
637 max_request_size,
638 max_response_size,
639 request_timeout,
640 inbound_queue,
641 )
642 }
643
644 async fn run(mut self) {
646 log::debug!(target: LOG_TARGET, "starting litep2p network backend");
647
648 loop {
649 let num_connected_peers = self
650 .peerset_handles
651 .get(&self.block_announce_protocol)
652 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
653 self.num_connected.store(num_connected_peers, Ordering::Relaxed);
654
655 tokio::select! {
656 command = self.cmd_rx.next() => match command {
657 None => return,
658 Some(command) => match command {
659 NetworkServiceCommand::GetValue{ key } => {
660 let query_id = self.discovery.get_value(key.clone()).await;
661 self.pending_get_values.insert(query_id, (key, Instant::now()));
662 }
663 NetworkServiceCommand::PutValue { key, value } => {
664 let query_id = self.discovery.put_value(key.clone(), value).await;
665 self.pending_put_values.insert(query_id, (key, Instant::now()));
666 }
667 NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
668 let kademlia_key = record.key.to_vec().into();
669 let query_id = self.discovery.put_value_to_peers(record, peers, update_local_storage).await;
670 self.pending_put_values.insert(query_id, (kademlia_key, Instant::now()));
671 }
672
673 NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
674 self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
675 }
676 NetworkServiceCommand::EventStream { tx } => {
677 self.event_streams.push(tx);
678 }
679 NetworkServiceCommand::Status { tx } => {
680 let _ = tx.send(NetworkStatus {
681 num_connected_peers: self
682 .peerset_handles
683 .get(&self.block_announce_protocol)
684 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
685 total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
686 total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
687 });
688 }
689 NetworkServiceCommand::AddPeersToReservedSet {
690 protocol,
691 peers,
692 } => {
693 let peers = self.add_addresses(peers.into_iter().map(Into::into));
694
695 match self.peerset_handles.get(&protocol) {
696 Some(handle) => {
697 let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
698 }
699 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
700 };
701 }
702 NetworkServiceCommand::AddKnownAddress { peer, address } => {
703 let mut address: Multiaddr = address.into();
704
705 if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
706 address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
707 }
708
709 if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) == 0usize {
710 log::debug!(
711 target: LOG_TARGET,
712 "couldn't add known address ({address}) for {peer:?}, unsupported transport"
713 );
714 }
715 },
716 NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
717 let peers = self.add_addresses(peers.into_iter().map(Into::into));
718
719 match self.peerset_handles.get(&protocol) {
720 Some(handle) => {
721 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
722 }
723 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
724 }
725
726 },
727 NetworkServiceCommand::DisconnectPeer {
728 protocol,
729 peer,
730 } => {
731 let Some(handle) = self.peerset_handles.get(&protocol) else {
732 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
733 continue
734 };
735
736 let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
737 }
738 NetworkServiceCommand::SetReservedOnly {
739 protocol,
740 reserved_only,
741 } => {
742 let Some(handle) = self.peerset_handles.get(&protocol) else {
743 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
744 continue
745 };
746
747 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
748 }
749 NetworkServiceCommand::RemoveReservedPeers {
750 protocol,
751 peers,
752 } => {
753 let Some(handle) = self.peerset_handles.get(&protocol) else {
754 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
755 continue
756 };
757
758 let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
759 }
760 }
761 },
762 event = self.discovery.next() => match event {
763 None => return,
764 Some(DiscoveryEvent::Discovered { addresses }) => {
765 for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
767 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
768 self.peerstore_handle.add_known_peer(peer);
769 }
770 }
771 }
772 Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
773 for peer in peers {
774 self.peerstore_handle.add_known_peer(peer.into());
775 }
776 }
777 Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
778 if !self.pending_get_values.contains_key(&query_id) {
779 log::error!(
780 target: LOG_TARGET,
781 "Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
782 );
783
784 continue
785 }
786
787 let peer_id: sc_network_types::PeerId = record.peer.into();
788 let record = PeerRecord {
789 record: P2PRecord {
790 key: record.record.key.to_vec().into(),
791 value: record.record.value,
792 publisher: record.record.publisher.map(|peer_id| {
793 let peer_id: sc_network_types::PeerId = peer_id.into();
794 peer_id.into()
795 }),
796 expires: record.record.expires,
797 },
798 peer: Some(peer_id.into()),
799 };
800
801 self.event_streams.send(
802 Event::Dht(
803 DhtEvent::ValueFound(
804 record.into()
805 )
806 )
807 );
808 }
809 Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
810 match self.pending_get_values.remove(&query_id) {
811 Some((key, started)) => {
812 log::trace!(
813 target: LOG_TARGET,
814 "`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
815 );
816
817 if let Some(ref metrics) = self.metrics {
818 metrics
819 .kademlia_query_duration
820 .with_label_values(&["value-get"])
821 .observe(started.elapsed().as_secs_f64());
822 }
823 },
824 None => {
825 log::error!(
826 target: LOG_TARGET,
827 "Missing/invalid pending query for `GET_VALUE`: {query_id:?}"
828 );
829 debug_assert!(false);
830 },
831 }
832 }
833 Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
834 match self.pending_put_values.remove(&query_id) {
835 None => log::warn!(
836 target: LOG_TARGET,
837 "`PUT_VALUE` succeeded for a non-existent query",
838 ),
839 Some((key, started)) => {
840 log::trace!(
841 target: LOG_TARGET,
842 "`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
843 );
844
845 self.event_streams.send(Event::Dht(
846 DhtEvent::ValuePut(libp2p::kad::RecordKey::new(&key))
847 ));
848
849 if let Some(ref metrics) = self.metrics {
850 metrics
851 .kademlia_query_duration
852 .with_label_values(&["value-put"])
853 .observe(started.elapsed().as_secs_f64());
854 }
855 }
856 }
857 }
858 Some(DiscoveryEvent::QueryFailed { query_id }) => {
859 match self.pending_get_values.remove(&query_id) {
860 None => match self.pending_put_values.remove(&query_id) {
861 None => log::warn!(
862 target: LOG_TARGET,
863 "non-existent query failed ({query_id:?})",
864 ),
865 Some((key, started)) => {
866 log::debug!(
867 target: LOG_TARGET,
868 "`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
869 );
870
871 self.event_streams.send(Event::Dht(
872 DhtEvent::ValuePutFailed(libp2p::kad::RecordKey::new(&key))
873 ));
874
875 if let Some(ref metrics) = self.metrics {
876 metrics
877 .kademlia_query_duration
878 .with_label_values(&["value-put-failed"])
879 .observe(started.elapsed().as_secs_f64());
880 }
881 }
882 }
883 Some((key, started)) => {
884 log::debug!(
885 target: LOG_TARGET,
886 "`GET_VALUE` ({query_id:?}) failed for key {key:?}",
887 );
888
889 self.event_streams.send(Event::Dht(
890 DhtEvent::ValueNotFound(libp2p::kad::RecordKey::new(&key))
891 ));
892
893 if let Some(ref metrics) = self.metrics {
894 metrics
895 .kademlia_query_duration
896 .with_label_values(&["value-get-failed"])
897 .observe(started.elapsed().as_secs_f64());
898 }
899 }
900 }
901 }
902 Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
903 self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
904 }
905 Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
906 match self.litep2p.public_addresses().add_address(address.clone().into()) {
907 Ok(inserted) => if inserted {
908 log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
909 },
910 Err(err) => {
911 log::warn!(
912 target: LOG_TARGET,
913 "🔍 Failed to add discovered external address {address:?}: {err:?}",
914 );
915 },
916 }
917 }
918 Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
919 let local_peer_id = self.litep2p.local_peer_id();
920
921 let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
923 address.with(Protocol::P2p(*local_peer_id.as_ref()))
924 } else {
925 address
926 };
927
928 if self.litep2p.public_addresses().remove_address(&address) {
929 log::info!(target: LOG_TARGET, "🔍 Expired external address for our node: {address}");
930 } else {
931 log::warn!(
932 target: LOG_TARGET,
933 "🔍 Failed to remove expired external address {address:?}"
934 );
935 }
936 }
937 Some(DiscoveryEvent::Ping { peer, rtt }) => {
938 log::trace!(
939 target: LOG_TARGET,
940 "ping time with {peer:?}: {rtt:?}",
941 );
942 }
943 Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
944 self.event_streams.send(Event::Dht(
945 DhtEvent::PutRecordRequest(
946 libp2p::kad::RecordKey::new(&key),
947 value,
948 publisher.map(Into::into),
949 expires,
950 )
951 ));
952 },
953
954 Some(DiscoveryEvent::RandomKademliaStarted) => {
955 if let Some(metrics) = self.metrics.as_ref() {
956 metrics.kademlia_random_queries_total.inc();
957 }
958 }
959 },
960 event = self.litep2p.next_event() => match event {
961 Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
962 let Some(metrics) = &self.metrics else {
963 continue;
964 };
965
966 let direction = match endpoint {
967 Endpoint::Dialer { .. } => "out",
968 Endpoint::Listener { .. } => {
969 metrics.incoming_connections_total.inc();
974
975 "in"
976 },
977 };
978 metrics.connections_opened_total.with_label_values(&[direction]).inc();
979
980 match self.peers.entry(peer) {
981 Entry::Vacant(entry) => {
982 entry.insert(ConnectionContext {
983 endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
984 num_connections: 1usize,
985 });
986 metrics.distinct_peers_connections_opened_total.inc();
987 }
988 Entry::Occupied(entry) => {
989 let entry = entry.into_mut();
990 entry.num_connections += 1;
991 entry.endpoints.insert(endpoint.connection_id(), endpoint);
992 }
993 }
994 }
995 Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
996 let Some(metrics) = &self.metrics else {
997 continue;
998 };
999
1000 let Some(context) = self.peers.get_mut(&peer) else {
1001 log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1002 continue
1003 };
1004
1005 let direction = match context.endpoints.remove(&connection_id) {
1006 None => {
1007 log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1008 continue
1009 }
1010 Some(endpoint) => {
1011 context.num_connections -= 1;
1012
1013 match endpoint {
1014 Endpoint::Dialer { .. } => "out",
1015 Endpoint::Listener { .. } => "in",
1016 }
1017 }
1018 };
1019
1020 metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1021
1022 if context.num_connections == 0 {
1023 self.peers.remove(&peer);
1024 metrics.distinct_peers_connections_closed_total.inc();
1025 }
1026 }
1027 Some(Litep2pEvent::DialFailure { address, error }) => {
1028 log::debug!(
1029 target: LOG_TARGET,
1030 "failed to dial peer at {address:?}: {error:?}",
1031 );
1032
1033 if let Some(metrics) = &self.metrics {
1034 let reason = match error {
1035 DialError::Timeout => "timeout",
1036 DialError::AddressError(_) => "invalid-address",
1037 DialError::DnsError(_) => "cannot-resolve-dns",
1038 DialError::NegotiationError(error) => match error {
1039 NegotiationError::Timeout => "timeout",
1040 NegotiationError::PeerIdMissing => "missing-peer-id",
1041 NegotiationError::StateMismatch => "state-mismatch",
1042 NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1043 NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1044 NegotiationError::SnowError(_) => "noise-error",
1045 NegotiationError::ParseError(_) => "parse-error",
1046 NegotiationError::IoError(_) => "io-error",
1047 NegotiationError::WebSocket(_) => "webscoket-error",
1048 NegotiationError::BadSignature => "bad-signature",
1049 }
1050 };
1051
1052 metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1053 }
1054 }
1055 Some(Litep2pEvent::ListDialFailures { errors }) => {
1056 log::debug!(
1057 target: LOG_TARGET,
1058 "failed to dial peer on multiple addresses {errors:?}",
1059 );
1060
1061 if let Some(metrics) = &self.metrics {
1062 metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1063 }
1064 }
1065 None => {
1066 log::error!(
1067 target: LOG_TARGET,
1068 "Litep2p backend terminated"
1069 );
1070 return
1071 }
1072 },
1073 }
1074 }
1075 }
1076}