libp2p_kad/behaviour.rs
1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23mod test;
24
25use std::{
26 collections::{BTreeMap, HashMap, HashSet, VecDeque},
27 fmt,
28 num::NonZeroUsize,
29 task::{Context, Poll, Waker},
30 time::Duration,
31 vec,
32};
33
34use fnv::FnvHashSet;
35use libp2p_core::{transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
36use libp2p_identity::PeerId;
37use libp2p_swarm::{
38 behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
39 dial_opts::{self, DialOpts},
40 ConnectionDenied, ConnectionHandler, ConnectionId, DialError, ExternalAddresses,
41 ListenAddresses, NetworkBehaviour, NotifyHandler, StreamProtocol, THandler, THandlerInEvent,
42 THandlerOutEvent, ToSwarm,
43};
44use thiserror::Error;
45use tracing::Level;
46use web_time::Instant;
47
48pub use crate::query::QueryStats;
49use crate::{
50 addresses::Addresses,
51 bootstrap,
52 handler::{Handler, HandlerEvent, HandlerIn, RequestId},
53 jobs::*,
54 kbucket::{self, Distance, KBucketConfig, KBucketsTable, NodeStatus},
55 protocol,
56 protocol::{ConnectionType, KadPeer, ProtocolConfig},
57 query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState},
58 record::{
59 self,
60 store::{self, RecordStore},
61 ProviderRecord, Record,
62 },
63 K_VALUE,
64};
65
66/// `Behaviour` is a `NetworkBehaviour` that implements the libp2p
67/// Kademlia protocol.
68pub struct Behaviour<TStore> {
69 /// The Kademlia routing table.
70 kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
71
72 /// The k-bucket insertion strategy.
73 kbucket_inserts: BucketInserts,
74
75 /// Configuration of the wire protocol.
76 protocol_config: ProtocolConfig,
77
78 /// Configuration of [`RecordStore`] filtering.
79 record_filtering: StoreInserts,
80
81 /// The currently active (i.e. in-progress) queries.
82 queries: QueryPool,
83
84 /// The currently connected peers.
85 ///
86 /// This is a superset of the connected peers currently in the routing table.
87 connected_peers: FnvHashSet<PeerId>,
88
89 /// Periodic job for re-publication of provider records for keys
90 /// provided by the local node.
91 add_provider_job: Option<AddProviderJob>,
92
93 /// Periodic job for (re-)replication and (re-)publishing of
94 /// regular (value-)records.
95 put_record_job: Option<PutRecordJob>,
96
97 /// The TTL of regular (value-)records.
98 record_ttl: Option<Duration>,
99
100 /// The TTL of provider records.
101 provider_record_ttl: Option<Duration>,
102
103 /// Queued events to return when the behaviour is being polled.
104 queued_events: VecDeque<ToSwarm<Event, HandlerIn>>,
105
106 listen_addresses: ListenAddresses,
107
108 external_addresses: ExternalAddresses,
109
110 connections: HashMap<ConnectionId, PeerId>,
111
112 /// See [`Config::caching`].
113 caching: Caching,
114
115 local_peer_id: PeerId,
116
117 mode: Mode,
118 auto_mode: bool,
119 no_events_waker: Option<Waker>,
120
121 /// The record storage.
122 store: TStore,
123
124 /// Tracks the status of the current bootstrap.
125 bootstrap_status: bootstrap::Status,
126}
127
128/// The configurable strategies for the insertion of peers
129/// and their addresses into the k-buckets of the Kademlia
130/// routing table.
131#[derive(Copy, Clone, Debug, PartialEq, Eq)]
132pub enum BucketInserts {
133 /// Whenever a connection to a peer is established as a
134 /// result of a dialing attempt and that peer is not yet
135 /// in the routing table, it is inserted as long as there
136 /// is a free slot in the corresponding k-bucket. If the
137 /// k-bucket is full but still has a free pending slot,
138 /// it may be inserted into the routing table at a later time if an unresponsive
139 /// disconnected peer is evicted from the bucket.
140 OnConnected,
141 /// New peers and addresses are only added to the routing table via
142 /// explicit calls to [`Behaviour::add_address`].
143 ///
144 /// > **Note**: Even though peers can only get into the
145 /// > routing table as a result of [`Behaviour::add_address`],
146 /// > routing table entries are still updated as peers
147 /// > connect and disconnect (i.e. the order of the entries
148 /// > as well as the network addresses).
149 Manual,
150}
151
152/// The configurable filtering strategies for the acceptance of
153/// incoming records.
154///
155/// This can be used for e.g. signature verification or validating
156/// the accompanying [`Key`].
157///
158/// [`Key`]: crate::record::Key
159#[derive(Copy, Clone, Debug, PartialEq, Eq)]
160pub enum StoreInserts {
161 /// Whenever a (provider) record is received,
162 /// the record is forwarded immediately to the [`RecordStore`].
163 Unfiltered,
164 /// Whenever a (provider) record is received, an event is emitted.
165 /// Provider records generate a [`InboundRequest::AddProvider`] under
166 /// [`Event::InboundRequest`], normal records generate a [`InboundRequest::PutRecord`]
167 /// under [`Event::InboundRequest`].
168 ///
169 /// When deemed valid, a (provider) record needs to be explicitly stored in
170 /// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`],
171 /// whichever is applicable. A mutable reference to the [`RecordStore`] can
172 /// be retrieved via [`Behaviour::store_mut`].
173 FilterBoth,
174}
175
176/// The configuration for the `Kademlia` behaviour.
177///
178/// The configuration is consumed by [`Behaviour::new`].
179#[derive(Debug, Clone)]
180pub struct Config {
181 kbucket_config: KBucketConfig,
182 query_config: QueryConfig,
183 protocol_config: ProtocolConfig,
184 record_ttl: Option<Duration>,
185 record_replication_interval: Option<Duration>,
186 record_publication_interval: Option<Duration>,
187 record_filtering: StoreInserts,
188 provider_record_ttl: Option<Duration>,
189 provider_publication_interval: Option<Duration>,
190 kbucket_inserts: BucketInserts,
191 caching: Caching,
192 periodic_bootstrap_interval: Option<Duration>,
193 automatic_bootstrap_throttle: Option<Duration>,
194}
195
196impl Default for Config {
197 /// Returns the default configuration.
198 ///
199 /// Deprecated: use `Config::new` instead.
200 fn default() -> Self {
201 Self::new(protocol::DEFAULT_PROTO_NAME)
202 }
203}
204
205/// The configuration for Kademlia "write-back" caching after successful
206/// lookups via [`Behaviour::get_record`].
207#[derive(Debug, Clone)]
208pub enum Caching {
209 /// Caching is disabled and the peers closest to records being looked up
210 /// that do not return a record are not tracked, i.e.
211 /// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty.
212 Disabled,
213 /// Up to `max_peers` peers not returning a record that are closest to the key
214 /// being looked up are tracked and returned in
215 /// [`GetRecordOk::FinishedWithNoAdditionalRecord`]. The write-back operation must be
216 /// performed explicitly, if desired and after choosing a record from the results, via
217 /// [`Behaviour::put_record_to`].
218 Enabled { max_peers: u16 },
219}
220
221impl Config {
222 /// Builds a new `Config` with the given protocol name.
223 pub fn new(protocol_name: StreamProtocol) -> Self {
224 Config {
225 kbucket_config: KBucketConfig::default(),
226 query_config: QueryConfig::default(),
227 protocol_config: ProtocolConfig::new(protocol_name),
228 record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
229 record_replication_interval: Some(Duration::from_secs(60 * 60)),
230 record_publication_interval: Some(Duration::from_secs(22 * 60 * 60)),
231 record_filtering: StoreInserts::Unfiltered,
232 provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
233 provider_record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
234 kbucket_inserts: BucketInserts::OnConnected,
235 caching: Caching::Enabled { max_peers: 1 },
236 periodic_bootstrap_interval: Some(Duration::from_secs(5 * 60)),
237 automatic_bootstrap_throttle: Some(bootstrap::DEFAULT_AUTOMATIC_THROTTLE),
238 }
239 }
240
241 /// Sets the timeout for a single query.
242 ///
243 /// > **Note**: A single query usually comprises at least as many requests
244 /// > as the replication factor, i.e. this is not a request timeout.
245 ///
246 /// The default is 60 seconds.
247 pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
248 self.query_config.timeout = timeout;
249 self
250 }
251
252 /// Sets the replication factor to use.
253 ///
254 /// The replication factor determines to how many closest peers
255 /// a record is replicated. The default is [`crate::K_VALUE`].
256 pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
257 self.query_config.replication_factor = replication_factor;
258 self
259 }
260
261 /// Sets the allowed level of parallelism for iterative queries.
262 ///
263 /// The `α` parameter in the Kademlia paper. The maximum number of peers
264 /// that an iterative query is allowed to wait for in parallel while
265 /// iterating towards the closest nodes to a target. Defaults to
266 /// `ALPHA_VALUE`.
267 ///
268 /// This only controls the level of parallelism of an iterative query, not
269 /// the level of parallelism of a query to a fixed set of peers.
270 ///
271 /// When used with [`Config::disjoint_query_paths`] it equals
272 /// the amount of disjoint paths used.
273 pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
274 self.query_config.parallelism = parallelism;
275 self
276 }
277
278 /// Require iterative queries to use disjoint paths for increased resiliency
279 /// in the presence of potentially adversarial nodes.
280 ///
281 /// When enabled the number of disjoint paths used equals the configured
282 /// parallelism.
283 ///
284 /// See the S/Kademlia paper for more information on the high level design
285 /// as well as its security improvements.
286 pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
287 self.query_config.disjoint_query_paths = enabled;
288 self
289 }
290
291 /// Sets the TTL for stored records.
292 ///
293 /// The TTL should be significantly longer than the (re-)publication
294 /// interval, to avoid premature expiration of records. The default is 36
295 /// hours.
296 ///
297 /// `None` means records never expire.
298 ///
299 /// Does not apply to provider records.
300 pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
301 self.record_ttl = record_ttl;
302 self
303 }
304
305 /// Sets whether or not records should be filtered before being stored.
306 ///
307 /// See [`StoreInserts`] for the different values.
308 /// Defaults to [`StoreInserts::Unfiltered`].
309 pub fn set_record_filtering(&mut self, filtering: StoreInserts) -> &mut Self {
310 self.record_filtering = filtering;
311 self
312 }
313
314 /// Sets the (re-)replication interval for stored records.
315 ///
316 /// Periodic replication of stored records ensures that the records
317 /// are always replicated to the available nodes closest to the key in the
318 /// context of DHT topology changes (i.e. nodes joining and leaving), thus
319 /// ensuring persistence until the record expires. Replication does not
320 /// prolong the regular lifetime of a record (for otherwise it would live
321 /// forever regardless of the configured TTL). The expiry of a record
322 /// is only extended through re-publication.
323 ///
324 /// This interval should be significantly shorter than the publication
325 /// interval, to ensure persistence between re-publications. The default
326 /// is 1 hour.
327 ///
328 /// `None` means that stored records are never re-replicated.
329 ///
330 /// Does not apply to provider records.
331 pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
332 self.record_replication_interval = interval;
333 self
334 }
335
336 /// Sets the (re-)publication interval of stored records.
337 ///
338 /// Records persist in the DHT until they expire. By default, published
339 /// records are re-published in regular intervals for as long as the record
340 /// exists in the local storage of the original publisher, thereby extending
341 /// the records lifetime.
342 ///
343 /// This interval should be significantly shorter than the record TTL, to
344 /// ensure records do not expire prematurely. The default is 24 hours.
345 ///
346 /// `None` means that stored records are never automatically re-published.
347 ///
348 /// Does not apply to provider records.
349 pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
350 self.record_publication_interval = interval;
351 self
352 }
353
354 /// Sets the TTL for provider records.
355 ///
356 /// `None` means that stored provider records never expire.
357 ///
358 /// Must be significantly larger than the provider publication interval.
359 pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
360 self.provider_record_ttl = ttl;
361 self
362 }
363
364 /// Sets the interval at which provider records for keys provided
365 /// by the local node are re-published.
366 ///
367 /// `None` means that stored provider records are never automatically
368 /// re-published.
369 ///
370 /// Must be significantly less than the provider record TTL.
371 pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
372 self.provider_publication_interval = interval;
373 self
374 }
375
376 /// Modifies the maximum allowed size of individual Kademlia packets.
377 ///
378 /// It might be necessary to increase this value if trying to put large
379 /// records.
380 pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
381 self.protocol_config.set_max_packet_size(size);
382 self
383 }
384
385 /// Sets the k-bucket insertion strategy for the Kademlia routing table.
386 pub fn set_kbucket_inserts(&mut self, inserts: BucketInserts) -> &mut Self {
387 self.kbucket_inserts = inserts;
388 self
389 }
390
391 /// Sets the [`Caching`] strategy to use for successful lookups.
392 ///
393 /// The default is [`Caching::Enabled`] with a `max_peers` of 1.
394 /// Hence, with default settings and a lookup quorum of 1, a successful lookup
395 /// will result in the record being cached at the closest node to the key that
396 /// did not return the record, i.e. the standard Kademlia behaviour.
397 pub fn set_caching(&mut self, c: Caching) -> &mut Self {
398 self.caching = c;
399 self
400 }
401
402 /// Sets the interval on which [`Behaviour::bootstrap`] is called periodically.
403 ///
404 /// * Default to `5` minutes.
405 /// * Set to `None` to disable periodic bootstrap.
406 pub fn set_periodic_bootstrap_interval(&mut self, interval: Option<Duration>) -> &mut Self {
407 self.periodic_bootstrap_interval = interval;
408 self
409 }
410
411 /// Sets the configuration for the k-buckets.
412 ///
413 /// * Default to K_VALUE.
414 ///
415 /// **WARNING**: setting a `size` higher that `K_VALUE` may imply additional memory allocations.
416 pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self {
417 self.kbucket_config.set_bucket_size(size);
418 self
419 }
420
421 /// Sets the timeout duration after creation of a pending entry after which
422 /// it becomes eligible for insertion into a full bucket, replacing the
423 /// least-recently (dis)connected node.
424 ///
425 /// * Default to `60` s.
426 pub fn set_kbucket_pending_timeout(&mut self, timeout: Duration) -> &mut Self {
427 self.kbucket_config.set_pending_timeout(timeout);
428 self
429 }
430
431 /// Sets the time to wait before calling [`Behaviour::bootstrap`] after a new peer is inserted
432 /// in the routing table. This prevent cascading bootstrap requests when multiple peers are
433 /// inserted into the routing table "at the same time". This also allows to wait a little
434 /// bit for other potential peers to be inserted into the routing table before triggering a
435 /// bootstrap, giving more context to the future bootstrap request.
436 ///
437 /// * Default to `500` ms.
438 /// * Set to `Some(Duration::ZERO)` to never wait before triggering a bootstrap request when a
439 /// new peer is inserted in the routing table.
440 /// * Set to `None` to disable automatic bootstrap (no bootstrap request will be triggered when
441 /// a new peer is inserted in the routing table).
442 #[cfg(test)]
443 pub(crate) fn set_automatic_bootstrap_throttle(
444 &mut self,
445 duration: Option<Duration>,
446 ) -> &mut Self {
447 self.automatic_bootstrap_throttle = duration;
448 self
449 }
450}
451
452impl<TStore> Behaviour<TStore>
453where
454 TStore: RecordStore + Send + 'static,
455{
456 /// Creates a new `Kademlia` network behaviour with a default configuration.
457 pub fn new(id: PeerId, store: TStore) -> Self {
458 Self::with_config(id, store, Default::default())
459 }
460
461 /// Get the protocol name of this kademlia instance.
462 pub fn protocol_names(&self) -> &[StreamProtocol] {
463 self.protocol_config.protocol_names()
464 }
465
466 /// Creates a new `Kademlia` network behaviour with the given configuration.
467 pub fn with_config(id: PeerId, store: TStore, config: Config) -> Self {
468 let local_key = kbucket::Key::from(id);
469
470 let put_record_job = config
471 .record_replication_interval
472 .or(config.record_publication_interval)
473 .map(|interval| {
474 PutRecordJob::new(
475 id,
476 interval,
477 config.record_publication_interval,
478 config.record_ttl,
479 )
480 });
481
482 let add_provider_job = config
483 .provider_publication_interval
484 .map(AddProviderJob::new);
485
486 Behaviour {
487 store,
488 caching: config.caching,
489 kbuckets: KBucketsTable::new(local_key, config.kbucket_config),
490 kbucket_inserts: config.kbucket_inserts,
491 protocol_config: config.protocol_config,
492 record_filtering: config.record_filtering,
493 queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
494 listen_addresses: Default::default(),
495 queries: QueryPool::new(config.query_config),
496 connected_peers: Default::default(),
497 add_provider_job,
498 put_record_job,
499 record_ttl: config.record_ttl,
500 provider_record_ttl: config.provider_record_ttl,
501 external_addresses: Default::default(),
502 local_peer_id: id,
503 connections: Default::default(),
504 mode: Mode::Client,
505 auto_mode: true,
506 no_events_waker: None,
507 bootstrap_status: bootstrap::Status::new(
508 config.periodic_bootstrap_interval,
509 config.automatic_bootstrap_throttle,
510 ),
511 }
512 }
513
514 /// Gets an iterator over immutable references to all running queries.
515 pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
516 self.queries.iter().filter_map(|query| {
517 if !query.is_finished() {
518 Some(QueryRef { query })
519 } else {
520 None
521 }
522 })
523 }
524
525 /// Gets an iterator over mutable references to all running queries.
526 pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
527 self.queries.iter_mut().filter_map(|query| {
528 if !query.is_finished() {
529 Some(QueryMut { query })
530 } else {
531 None
532 }
533 })
534 }
535
536 /// Gets an immutable reference to a running query, if it exists.
537 pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
538 self.queries.get(id).and_then(|query| {
539 if !query.is_finished() {
540 Some(QueryRef { query })
541 } else {
542 None
543 }
544 })
545 }
546
547 /// Gets a mutable reference to a running query, if it exists.
548 pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
549 self.queries.get_mut(id).and_then(|query| {
550 if !query.is_finished() {
551 Some(QueryMut { query })
552 } else {
553 None
554 }
555 })
556 }
557
558 /// Adds a known listen address of a peer participating in the DHT to the
559 /// routing table.
560 ///
561 /// Explicitly adding addresses of peers serves two purposes:
562 ///
563 /// 1. In order for a node to join the DHT, it must know about at least one other node of the
564 /// DHT.
565 ///
566 /// 2. When a remote peer initiates a connection and that peer is not yet in the routing
567 /// table, the `Kademlia` behaviour must be informed of an address on which that peer is
568 /// listening for connections before it can be added to the routing table from where it can
569 /// subsequently be discovered by all peers in the DHT.
570 ///
571 /// If the routing table has been updated as a result of this operation,
572 /// a [`Event::RoutingUpdated`] event is emitted.
573 pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
574 // ensuring address is a fully-qualified /p2p multiaddr
575 let Ok(address) = address.with_p2p(*peer) else {
576 return RoutingUpdate::Failed;
577 };
578 let key = kbucket::Key::from(*peer);
579 match self.kbuckets.entry(&key) {
580 Some(kbucket::Entry::Present(mut entry, _)) => {
581 if entry.value().insert(address) {
582 self.queued_events
583 .push_back(ToSwarm::GenerateEvent(Event::RoutingUpdated {
584 peer: *peer,
585 is_new_peer: false,
586 addresses: entry.value().clone(),
587 old_peer: None,
588 bucket_range: self
589 .kbuckets
590 .bucket(&key)
591 .map(|b| b.range())
592 .expect("Not kbucket::Entry::SelfEntry."),
593 }))
594 }
595 RoutingUpdate::Success
596 }
597 Some(kbucket::Entry::Pending(mut entry, _)) => {
598 entry.value().insert(address);
599 RoutingUpdate::Pending
600 }
601 Some(kbucket::Entry::Absent(entry)) => {
602 let addresses = Addresses::new(address);
603 let status = if self.connected_peers.contains(peer) {
604 NodeStatus::Connected
605 } else {
606 NodeStatus::Disconnected
607 };
608 match entry.insert(addresses.clone(), status) {
609 kbucket::InsertResult::Inserted => {
610 self.bootstrap_on_low_peers();
611
612 self.queued_events.push_back(ToSwarm::GenerateEvent(
613 Event::RoutingUpdated {
614 peer: *peer,
615 is_new_peer: true,
616 addresses,
617 old_peer: None,
618 bucket_range: self
619 .kbuckets
620 .bucket(&key)
621 .map(|b| b.range())
622 .expect("Not kbucket::Entry::SelfEntry."),
623 },
624 ));
625 RoutingUpdate::Success
626 }
627 kbucket::InsertResult::Full => {
628 tracing::debug!(%peer, "Bucket full. Peer not added to routing table");
629 RoutingUpdate::Failed
630 }
631 kbucket::InsertResult::Pending { disconnected } => {
632 self.queued_events.push_back(ToSwarm::Dial {
633 opts: DialOpts::peer_id(disconnected.into_preimage()).build(),
634 });
635 RoutingUpdate::Pending
636 }
637 }
638 }
639 None => RoutingUpdate::Failed,
640 }
641 }
642
643 /// Removes an address of a peer from the routing table.
644 ///
645 /// If the given address is the last address of the peer in the
646 /// routing table, the peer is removed from the routing table
647 /// and `Some` is returned with a view of the removed entry.
648 /// The same applies if the peer is currently pending insertion
649 /// into the routing table.
650 ///
651 /// If the given peer or address is not in the routing table,
652 /// this is a no-op.
653 pub fn remove_address(
654 &mut self,
655 peer: &PeerId,
656 address: &Multiaddr,
657 ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
658 let address = &address.to_owned().with_p2p(*peer).ok()?;
659 let key = kbucket::Key::from(*peer);
660 match self.kbuckets.entry(&key)? {
661 kbucket::Entry::Present(mut entry, _) => {
662 if entry.value().remove(address).is_err() {
663 Some(entry.remove()) // it is the last address, thus remove the peer.
664 } else {
665 None
666 }
667 }
668 kbucket::Entry::Pending(mut entry, _) => {
669 if entry.value().remove(address).is_err() {
670 Some(entry.remove()) // it is the last address, thus remove the peer.
671 } else {
672 None
673 }
674 }
675 kbucket::Entry::Absent(..) => None,
676 }
677 }
678
679 /// Removes a peer from the routing table.
680 ///
681 /// Returns `None` if the peer was not in the routing table,
682 /// not even pending insertion.
683 pub fn remove_peer(
684 &mut self,
685 peer: &PeerId,
686 ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
687 let key = kbucket::Key::from(*peer);
688 match self.kbuckets.entry(&key)? {
689 kbucket::Entry::Present(entry, _) => Some(entry.remove()),
690 kbucket::Entry::Pending(entry, _) => Some(entry.remove()),
691 kbucket::Entry::Absent(..) => None,
692 }
693 }
694
695 /// Returns an iterator over all non-empty buckets in the routing table.
696 pub fn kbuckets(
697 &mut self,
698 ) -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>> {
699 self.kbuckets.iter().filter(|b| !b.is_empty())
700 }
701
702 /// Returns the k-bucket for the distance to the given key.
703 ///
704 /// Returns `None` if the given key refers to the local key.
705 pub fn kbucket<K>(
706 &mut self,
707 key: K,
708 ) -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
709 where
710 K: Into<kbucket::Key<K>> + Clone,
711 {
712 self.kbuckets.bucket(&key.into())
713 }
714
715 /// Initiates an iterative query for the closest peers to the given key.
716 ///
717 /// The result of the query is delivered in a
718 /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
719 pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
720 where
721 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
722 {
723 self.get_closest_peers_inner(key, None)
724 }
725
726 /// Initiates an iterative query for the closest peers to the given key.
727 /// The expected responding peers is specified by `num_results`
728 /// Note that the result is capped after exceeds K_VALUE
729 ///
730 /// The result of the query is delivered in a
731 /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
732 pub fn get_n_closest_peers<K>(&mut self, key: K, num_results: NonZeroUsize) -> QueryId
733 where
734 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
735 {
736 // The inner code never expect higher than K_VALUE results to be returned.
737 // And removing such cap will be tricky,
738 // since it would involve forging a new key and additional requests.
739 // Hence bound to K_VALUE here to set clear expectation and prevent unexpected behaviour.
740 let capped_num_results = std::cmp::min(num_results, K_VALUE);
741 self.get_closest_peers_inner(key, Some(capped_num_results))
742 }
743
744 fn get_closest_peers_inner<K>(&mut self, key: K, num_results: Option<NonZeroUsize>) -> QueryId
745 where
746 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
747 {
748 let target: kbucket::Key<K> = key.clone().into();
749 let key: Vec<u8> = key.into();
750 let info = QueryInfo::GetClosestPeers {
751 key,
752 step: ProgressStep::first(),
753 num_results,
754 };
755 let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
756 self.queries.add_iter_closest(target, peer_keys, info)
757 }
758
759 /// Returns all peers ordered by distance to the given key; takes peers from local routing table
760 /// only.
761 pub fn get_closest_local_peers<'a, K: Clone>(
762 &'a mut self,
763 key: &'a kbucket::Key<K>,
764 ) -> impl Iterator<Item = kbucket::Key<PeerId>> + 'a {
765 self.kbuckets.closest_keys(key)
766 }
767
768 /// Finds the closest peers to a `key` in the context of a request by the `source` peer, such
769 /// that the `source` peer is never included in the result.
770 ///
771 /// Takes peers from local routing table only. Only returns number of peers equal to configured
772 /// replication factor.
773 pub fn find_closest_local_peers<'a, K: Clone>(
774 &'a mut self,
775 key: &'a kbucket::Key<K>,
776 source: &'a PeerId,
777 ) -> impl Iterator<Item = KadPeer> + 'a {
778 self.kbuckets
779 .closest(key)
780 .filter(move |e| e.node.key.preimage() != source)
781 .take(self.queries.config().replication_factor.get())
782 .map(KadPeer::from)
783 }
784
785 /// Performs a lookup for a record in the DHT.
786 ///
787 /// The result of this operation is delivered in a
788 /// [`Event::OutboundQueryProgressed{QueryResult::GetRecord}`].
789 pub fn get_record(&mut self, key: record::Key) -> QueryId {
790 let record = if let Some(record) = self.store.get(&key) {
791 if record.is_expired(Instant::now()) {
792 self.store.remove(&key);
793 None
794 } else {
795 Some(PeerRecord {
796 peer: None,
797 record: record.into_owned(),
798 })
799 }
800 } else {
801 None
802 };
803
804 let step = ProgressStep::first();
805
806 let target = kbucket::Key::new(key.clone());
807 let info = if record.is_some() {
808 QueryInfo::GetRecord {
809 key,
810 step: step.next(),
811 found_a_record: true,
812 cache_candidates: BTreeMap::new(),
813 }
814 } else {
815 QueryInfo::GetRecord {
816 key,
817 step: step.clone(),
818 found_a_record: false,
819 cache_candidates: BTreeMap::new(),
820 }
821 };
822 let peers = self.kbuckets.closest_keys(&target);
823 let id = self.queries.add_iter_closest(target.clone(), peers, info);
824
825 // No queries were actually done for the results yet.
826 let stats = QueryStats::empty();
827
828 if let Some(record) = record {
829 self.queued_events
830 .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
831 id,
832 result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))),
833 step,
834 stats,
835 }));
836 }
837
838 id
839 }
840
841 /// Stores a record in the DHT, locally as well as at the nodes
842 /// closest to the key as per the xor distance metric.
843 ///
844 /// Returns `Ok` if a record has been stored locally, providing the
845 /// `QueryId` of the initial query that replicates the record in the DHT.
846 /// The result of the query is eventually reported as a
847 /// [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`].
848 ///
849 /// The record is always stored locally with the given expiration. If the record's
850 /// expiration is `None`, the common case, it does not expire in local storage
851 /// but is still replicated with the configured record TTL. To remove the record
852 /// locally and stop it from being re-published in the DHT, see [`Behaviour::remove_record`].
853 ///
854 /// After the initial publication of the record, it is subject to (re-)replication
855 /// and (re-)publication as per the configured intervals. Periodic (re-)publication
856 /// does not update the record's expiration in local storage, thus a given record
857 /// with an explicit expiration will always expire at that instant and until then
858 /// is subject to regular (re-)replication and (re-)publication.
859 pub fn put_record(
860 &mut self,
861 mut record: Record,
862 quorum: Quorum,
863 ) -> Result<QueryId, store::Error> {
864 record.publisher = Some(*self.kbuckets.local_key().preimage());
865 self.store.put(record.clone())?;
866 record.expires = record
867 .expires
868 .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
869 let quorum = quorum.eval(self.queries.config().replication_factor);
870 let target = kbucket::Key::new(record.key.clone());
871 let peers = self.kbuckets.closest_keys(&target);
872 let context = PutRecordContext::Publish;
873 let info = QueryInfo::PutRecord {
874 context,
875 record,
876 quorum,
877 phase: PutRecordPhase::GetClosestPeers,
878 };
879 Ok(self.queries.add_iter_closest(target.clone(), peers, info))
880 }
881
882 /// Stores a record at specific peers, without storing it locally.
883 ///
884 /// The given [`Quorum`] is understood in the context of the total
885 /// number of distinct peers given.
886 ///
887 /// If the record's expiration is `None`, the configured record TTL is used.
888 ///
889 /// > **Note**: This is not a regular Kademlia DHT operation. It needs to be
890 /// > used to selectively update or store a record to specific peers
891 /// > for the purpose of e.g. making sure these peers have the latest
892 /// > "version" of a record or to "cache" a record at further peers
893 /// > to increase the lookup success rate on the DHT for other peers.
894 /// >
895 /// > In particular, there is no automatic storing of records performed, and this
896 /// > method must be used to ensure the standard Kademlia
897 /// > procedure of "caching" (i.e. storing) a found record at the closest
898 /// > node to the key that _did not_ return it.
899 pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
900 where
901 I: ExactSizeIterator<Item = PeerId>,
902 {
903 let quorum = if peers.len() > 0 {
904 quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0"))
905 } else {
906 // If no peers are given, we just let the query fail immediately
907 // due to the fact that the quorum must be at least one, instead of
908 // introducing a new kind of error.
909 NonZeroUsize::new(1).expect("1 > 0")
910 };
911 record.expires = record
912 .expires
913 .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
914 let context = PutRecordContext::Custom;
915 let info = QueryInfo::PutRecord {
916 context,
917 record,
918 quorum,
919 phase: PutRecordPhase::PutRecord {
920 success: Vec::new(),
921 get_closest_peers_stats: QueryStats::empty(),
922 },
923 };
924 self.queries.add_fixed(peers, info)
925 }
926
927 /// Removes the record with the given key from _local_ storage,
928 /// if the local node is the publisher of the record.
929 ///
930 /// Has no effect if a record for the given key is stored locally but
931 /// the local node is not a publisher of the record.
932 ///
933 /// This is a _local_ operation. However, it also has the effect that
934 /// the record will no longer be periodically re-published, allowing the
935 /// record to eventually expire throughout the DHT.
936 pub fn remove_record(&mut self, key: &record::Key) {
937 if let Some(r) = self.store.get(key) {
938 if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
939 self.store.remove(key)
940 }
941 }
942 }
943
944 /// Gets a mutable reference to the record store.
945 pub fn store_mut(&mut self) -> &mut TStore {
946 &mut self.store
947 }
948
949 /// Bootstraps the local node to join the DHT.
950 ///
951 /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
952 /// own ID in the DHT. This introduces the local node to the other nodes
953 /// in the DHT and populates its routing table with the closest neighbours.
954 ///
955 /// Subsequently, all buckets farther from the bucket of the closest neighbour are
956 /// refreshed by initiating an additional bootstrapping query for each such
957 /// bucket with random keys.
958 ///
959 /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
960 /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
961 /// reported via [`Event::OutboundQueryProgressed{QueryResult::Bootstrap}`] events,
962 /// with one such event per bootstrapping query.
963 ///
964 /// Returns `Err` if bootstrapping is impossible due an empty routing table.
965 ///
966 /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
967 /// > See [`Behaviour::add_address`].
968 ///
969 /// > **Note**: Bootstrap does not require to be called manually. It is periodically
970 /// > invoked at regular intervals based on the configured `periodic_bootstrap_interval` (see
971 /// > [`Config::set_periodic_bootstrap_interval`] for details) and it is also automatically
972 /// > invoked
973 /// > when a new peer is inserted in the routing table.
974 /// > This parameter is used to call [`Behaviour::bootstrap`] periodically and automatically
975 /// > to ensure a healthy routing table.
976 pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
977 let local_key = *self.kbuckets.local_key();
978 let info = QueryInfo::Bootstrap {
979 peer: *local_key.preimage(),
980 remaining: None,
981 step: ProgressStep::first(),
982 };
983 let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
984 if peers.is_empty() {
985 self.bootstrap_status.reset_timers();
986 Err(NoKnownPeers())
987 } else {
988 self.bootstrap_status.on_started();
989 Ok(self.queries.add_iter_closest(local_key, peers, info))
990 }
991 }
992
993 /// Establishes the local node as a provider of a value for the given key.
994 ///
995 /// This operation publishes a provider record with the given key and
996 /// identity of the local node to the peers closest to the key, thus establishing
997 /// the local node as a provider.
998 ///
999 /// Returns `Ok` if a provider record has been stored locally, providing the
1000 /// `QueryId` of the initial query that announces the local node as a provider.
1001 ///
1002 /// The publication of the provider records is periodically repeated as per the
1003 /// configured interval, to renew the expiry and account for changes to the DHT
1004 /// topology. A provider record may be removed from local storage and
1005 /// thus no longer re-published by calling [`Behaviour::stop_providing`].
1006 ///
1007 /// In contrast to the standard Kademlia push-based model for content distribution
1008 /// implemented by [`Behaviour::put_record`], the provider API implements a
1009 /// pull-based model that may be used in addition or as an alternative.
1010 /// The means by which the actual value is obtained from a provider is out of scope
1011 /// of the libp2p Kademlia provider API.
1012 ///
1013 /// The results of the (repeated) provider announcements sent by this node are
1014 /// reported via [`Event::OutboundQueryProgressed{QueryResult::StartProviding}`].
1015 pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
1016 // Note: We store our own provider records locally without local addresses
1017 // to avoid redundant storage and outdated addresses. Instead these are
1018 // acquired on demand when returning a `ProviderRecord` for the local node.
1019 let local_addrs = Vec::new();
1020 let record = ProviderRecord::new(
1021 key.clone(),
1022 *self.kbuckets.local_key().preimage(),
1023 local_addrs,
1024 );
1025 self.store.add_provider(record)?;
1026 let target = kbucket::Key::new(key.clone());
1027 let peers = self.kbuckets.closest_keys(&target);
1028 let context = AddProviderContext::Publish;
1029 let info = QueryInfo::AddProvider {
1030 context,
1031 key,
1032 phase: AddProviderPhase::GetClosestPeers,
1033 };
1034 let id = self.queries.add_iter_closest(target.clone(), peers, info);
1035 Ok(id)
1036 }
1037
1038 /// Stops the local node from announcing that it is a provider for the given key.
1039 ///
1040 /// This is a local operation. The local node will still be considered as a
1041 /// provider for the key by other nodes until these provider records expire.
1042 pub fn stop_providing(&mut self, key: &record::Key) {
1043 self.store
1044 .remove_provider(key, self.kbuckets.local_key().preimage());
1045 }
1046
1047 /// Performs a lookup for providers of a value to the given key.
1048 ///
1049 /// The result of this operation is delivered in a
1050 /// reported via [`Event::OutboundQueryProgressed{QueryResult::GetProviders}`].
1051 pub fn get_providers(&mut self, key: record::Key) -> QueryId {
1052 let providers: HashSet<_> = self
1053 .store
1054 .providers(&key)
1055 .into_iter()
1056 .filter(|p| !p.is_expired(Instant::now()))
1057 .map(|p| p.provider)
1058 .collect();
1059
1060 let step = ProgressStep::first();
1061
1062 let info = QueryInfo::GetProviders {
1063 key: key.clone(),
1064 providers_found: providers.len(),
1065 step: if providers.is_empty() {
1066 step.clone()
1067 } else {
1068 step.next()
1069 },
1070 };
1071
1072 let target = kbucket::Key::new(key.clone());
1073 let peers = self.kbuckets.closest_keys(&target);
1074 let id = self.queries.add_iter_closest(target.clone(), peers, info);
1075
1076 // No queries were actually done for the results yet.
1077 let stats = QueryStats::empty();
1078
1079 if !providers.is_empty() {
1080 self.queued_events
1081 .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
1082 id,
1083 result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
1084 key,
1085 providers,
1086 })),
1087 step,
1088 stats,
1089 }));
1090 }
1091 id
1092 }
1093
1094 /// Set the [`Mode`] in which we should operate.
1095 ///
1096 /// By default, we are in [`Mode::Client`] and will swap into [`Mode::Server`] as soon as we
1097 /// have a confirmed, external address via [`FromSwarm::ExternalAddrConfirmed`].
1098 ///
1099 /// Setting a mode via this function disables this automatic behaviour and unconditionally
1100 /// operates in the specified mode. To reactivate the automatic configuration, pass [`None`]
1101 /// instead.
1102 pub fn set_mode(&mut self, mode: Option<Mode>) {
1103 match mode {
1104 Some(mode) => {
1105 self.mode = mode;
1106 self.auto_mode = false;
1107 self.reconfigure_mode();
1108 }
1109 None => {
1110 self.auto_mode = true;
1111 self.determine_mode_from_external_addresses();
1112 }
1113 }
1114
1115 if let Some(waker) = self.no_events_waker.take() {
1116 waker.wake();
1117 }
1118 }
1119
1120 /// Get the [`Mode`] in which the DHT is currently operating.
1121 pub fn mode(&self) -> Mode {
1122 self.mode
1123 }
1124
1125 fn reconfigure_mode(&mut self) {
1126 if self.connections.is_empty() {
1127 return;
1128 }
1129
1130 let num_connections = self.connections.len();
1131
1132 tracing::debug!(
1133 "Re-configuring {} established connection{}",
1134 num_connections,
1135 if num_connections > 1 { "s" } else { "" }
1136 );
1137
1138 self.queued_events
1139 .extend(
1140 self.connections
1141 .iter()
1142 .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
1143 peer_id: *peer_id,
1144 handler: NotifyHandler::One(*conn_id),
1145 event: HandlerIn::ReconfigureMode {
1146 new_mode: self.mode,
1147 },
1148 }),
1149 );
1150 }
1151
1152 fn determine_mode_from_external_addresses(&mut self) {
1153 let old_mode = self.mode;
1154
1155 self.mode = match (self.external_addresses.as_slice(), self.mode) {
1156 ([], Mode::Server) => {
1157 tracing::debug!("Switching to client-mode because we no longer have any confirmed external addresses");
1158
1159 Mode::Client
1160 }
1161 ([], Mode::Client) => {
1162 // Previously client-mode, now also client-mode because no external addresses.
1163
1164 Mode::Client
1165 }
1166 (confirmed_external_addresses, Mode::Client) => {
1167 if tracing::enabled!(Level::DEBUG) {
1168 let confirmed_external_addresses =
1169 to_comma_separated_list(confirmed_external_addresses);
1170
1171 tracing::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
1172 }
1173
1174 Mode::Server
1175 }
1176 (confirmed_external_addresses, Mode::Server) => {
1177 debug_assert!(
1178 !confirmed_external_addresses.is_empty(),
1179 "Previous match arm handled empty list"
1180 );
1181
1182 // Previously, server-mode, now also server-mode because > 1 external address.
1183 // Don't log anything to avoid spam.
1184 Mode::Server
1185 }
1186 };
1187
1188 self.reconfigure_mode();
1189
1190 if old_mode != self.mode {
1191 self.queued_events
1192 .push_back(ToSwarm::GenerateEvent(Event::ModeChanged {
1193 new_mode: self.mode,
1194 }));
1195 }
1196 }
1197
1198 /// Processes discovered peers from a successful request in an iterative `Query`.
1199 fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
1200 where
1201 I: Iterator<Item = &'a KadPeer> + Clone,
1202 {
1203 let local_id = self.kbuckets.local_key().preimage();
1204 let others_iter = peers.filter(|p| &p.node_id != local_id);
1205 if let Some(query) = self.queries.get_mut(query_id) {
1206 tracing::trace!(peer=%source, query=?query_id, "Request to peer in query succeeded");
1207 for peer in others_iter.clone() {
1208 tracing::trace!(
1209 ?peer,
1210 %source,
1211 query=?query_id,
1212 "Peer reported by source in query"
1213 );
1214 let addrs = peer.multiaddrs.iter().cloned().collect();
1215 query.peers.addresses.insert(peer.node_id, addrs);
1216 }
1217 query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
1218 }
1219 }
1220
1221 /// Collects all peers who are known to be providers of the value for a given `Multihash`.
1222 fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
1223 let kbuckets = &mut self.kbuckets;
1224 let connected = &mut self.connected_peers;
1225 let listen_addresses = &self.listen_addresses;
1226 let external_addresses = &self.external_addresses;
1227
1228 self.store
1229 .providers(key)
1230 .into_iter()
1231 .filter_map(move |p| {
1232 if &p.provider != source {
1233 let node_id = p.provider;
1234 let multiaddrs = p.addresses;
1235 let connection_ty = if connected.contains(&node_id) {
1236 ConnectionType::Connected
1237 } else {
1238 ConnectionType::NotConnected
1239 };
1240 if multiaddrs.is_empty() {
1241 // The provider is either the local node and we fill in
1242 // the local addresses on demand, or it is a legacy
1243 // provider record without addresses, in which case we
1244 // try to find addresses in the routing table, as was
1245 // done before provider records were stored along with
1246 // their addresses.
1247 if &node_id == kbuckets.local_key().preimage() {
1248 Some(
1249 listen_addresses
1250 .iter()
1251 .chain(external_addresses.iter())
1252 .cloned()
1253 .collect::<Vec<_>>(),
1254 )
1255 } else {
1256 let key = kbucket::Key::from(node_id);
1257 kbuckets
1258 .entry(&key)
1259 .as_mut()
1260 .and_then(|e| e.view())
1261 .map(|e| e.node.value.clone().into_vec())
1262 }
1263 } else {
1264 Some(multiaddrs)
1265 }
1266 .map(|multiaddrs| KadPeer {
1267 node_id,
1268 multiaddrs,
1269 connection_ty,
1270 })
1271 } else {
1272 None
1273 }
1274 })
1275 .take(self.queries.config().replication_factor.get())
1276 .collect()
1277 }
1278
1279 /// Starts an iterative `ADD_PROVIDER` query for the given key.
1280 fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
1281 let info = QueryInfo::AddProvider {
1282 context,
1283 key: key.clone(),
1284 phase: AddProviderPhase::GetClosestPeers,
1285 };
1286 let target = kbucket::Key::new(key);
1287 let peers = self.kbuckets.closest_keys(&target);
1288 self.queries.add_iter_closest(target.clone(), peers, info);
1289 }
1290
1291 /// Starts an iterative `PUT_VALUE` query for the given record.
1292 fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
1293 let quorum = quorum.eval(self.queries.config().replication_factor);
1294 let target = kbucket::Key::new(record.key.clone());
1295 let peers = self.kbuckets.closest_keys(&target);
1296 let info = QueryInfo::PutRecord {
1297 record,
1298 quorum,
1299 context,
1300 phase: PutRecordPhase::GetClosestPeers,
1301 };
1302 self.queries.add_iter_closest(target.clone(), peers, info);
1303 }
1304
1305 /// Updates the routing table with a new connection status and address of a peer.
1306 fn connection_updated(
1307 &mut self,
1308 peer: PeerId,
1309 address: Option<Multiaddr>,
1310 new_status: NodeStatus,
1311 ) {
1312 let key = kbucket::Key::from(peer);
1313 match self.kbuckets.entry(&key) {
1314 Some(kbucket::Entry::Present(mut entry, old_status)) => {
1315 if old_status != new_status {
1316 entry.update(new_status)
1317 }
1318 if let Some(address) = address {
1319 if entry.value().insert(address) {
1320 self.queued_events.push_back(ToSwarm::GenerateEvent(
1321 Event::RoutingUpdated {
1322 peer,
1323 is_new_peer: false,
1324 addresses: entry.value().clone(),
1325 old_peer: None,
1326 bucket_range: self
1327 .kbuckets
1328 .bucket(&key)
1329 .map(|b| b.range())
1330 .expect("Not kbucket::Entry::SelfEntry."),
1331 },
1332 ))
1333 }
1334 }
1335 }
1336
1337 Some(kbucket::Entry::Pending(mut entry, old_status)) => {
1338 if let Some(address) = address {
1339 entry.value().insert(address);
1340 }
1341 if old_status != new_status {
1342 entry.update(new_status);
1343 }
1344 }
1345
1346 Some(kbucket::Entry::Absent(entry)) => {
1347 // Only connected nodes with a known address are newly inserted.
1348 if new_status != NodeStatus::Connected {
1349 return;
1350 }
1351 match (address, self.kbucket_inserts) {
1352 (None, _) => {
1353 self.queued_events
1354 .push_back(ToSwarm::GenerateEvent(Event::UnroutablePeer { peer }));
1355 }
1356 (Some(a), BucketInserts::Manual) => {
1357 self.queued_events
1358 .push_back(ToSwarm::GenerateEvent(Event::RoutablePeer {
1359 peer,
1360 address: a,
1361 }));
1362 }
1363 (Some(a), BucketInserts::OnConnected) => {
1364 let addresses = Addresses::new(a);
1365 match entry.insert(addresses.clone(), new_status) {
1366 kbucket::InsertResult::Inserted => {
1367 self.bootstrap_on_low_peers();
1368
1369 let event = Event::RoutingUpdated {
1370 peer,
1371 is_new_peer: true,
1372 addresses,
1373 old_peer: None,
1374 bucket_range: self
1375 .kbuckets
1376 .bucket(&key)
1377 .map(|b| b.range())
1378 .expect("Not kbucket::Entry::SelfEntry."),
1379 };
1380 self.queued_events.push_back(ToSwarm::GenerateEvent(event));
1381 }
1382 kbucket::InsertResult::Full => {
1383 tracing::debug!(
1384 %peer,
1385 "Bucket full. Peer not added to routing table"
1386 );
1387 let address = addresses.first().clone();
1388 self.queued_events.push_back(ToSwarm::GenerateEvent(
1389 Event::RoutablePeer { peer, address },
1390 ));
1391 }
1392 kbucket::InsertResult::Pending { disconnected } => {
1393 let address = addresses.first().clone();
1394 self.queued_events.push_back(ToSwarm::GenerateEvent(
1395 Event::PendingRoutablePeer { peer, address },
1396 ));
1397
1398 // `disconnected` might already be in the process of re-connecting.
1399 // In other words `disconnected` might have already re-connected but
1400 // is not yet confirmed to support the Kademlia protocol via
1401 // [`HandlerEvent::ProtocolConfirmed`].
1402 //
1403 // Only try dialing peer if not currently connected.
1404 if !self.connected_peers.contains(disconnected.preimage()) {
1405 self.queued_events.push_back(ToSwarm::Dial {
1406 opts: DialOpts::peer_id(disconnected.into_preimage())
1407 .build(),
1408 })
1409 }
1410 }
1411 }
1412 }
1413 }
1414 }
1415 _ => {}
1416 }
1417 }
1418
1419 /// A new peer has been inserted in the routing table but we check if the routing
1420 /// table is currently small (less that `K_VALUE` peers are present) and only
1421 /// trigger a bootstrap in that case
1422 fn bootstrap_on_low_peers(&mut self) {
1423 if self
1424 .kbuckets()
1425 .map(|kbucket| kbucket.num_entries())
1426 .sum::<usize>()
1427 < K_VALUE.get()
1428 {
1429 self.bootstrap_status.trigger();
1430 }
1431 }
1432
1433 /// Handles a finished (i.e. successful) query.
1434 fn query_finished(&mut self, q: Query) -> Option<Event> {
1435 let query_id = q.id();
1436 tracing::trace!(query=?query_id, "Query finished");
1437 match q.info {
1438 QueryInfo::Bootstrap {
1439 peer,
1440 remaining,
1441 mut step,
1442 } => {
1443 let local_key = *self.kbuckets.local_key();
1444 let mut remaining = remaining.unwrap_or_else(|| {
1445 debug_assert_eq!(&peer, local_key.preimage());
1446 // The lookup for the local key finished. To complete the bootstrap process,
1447 // a bucket refresh should be performed for every bucket farther away than
1448 // the first non-empty bucket (which are most likely no more than the last
1449 // few, i.e. farthest, buckets).
1450 self.kbuckets
1451 .iter()
1452 .skip_while(|b| b.is_empty())
1453 .skip(1) // Skip the bucket with the closest neighbour.
1454 .map(|b| {
1455 // Try to find a key that falls into the bucket. While such keys can
1456 // be generated fully deterministically, the current libp2p kademlia
1457 // wire protocol requires transmission of the preimages of the actual
1458 // keys in the DHT keyspace, hence for now this is just a "best effort"
1459 // to find a key that hashes into a specific bucket. The probabilities
1460 // of finding a key in the bucket `b` with as most 16 trials are as
1461 // follows:
1462 //
1463 // Pr(bucket-255) = 1 - (1/2)^16 ~= 1
1464 // Pr(bucket-254) = 1 - (3/4)^16 ~= 1
1465 // Pr(bucket-253) = 1 - (7/8)^16 ~= 0.88
1466 // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
1467 // ...
1468 let mut target = kbucket::Key::from(PeerId::random());
1469 for _ in 0..16 {
1470 let d = local_key.distance(&target);
1471 if b.contains(&d) {
1472 break;
1473 }
1474 target = kbucket::Key::from(PeerId::random());
1475 }
1476 target
1477 })
1478 .collect::<Vec<_>>()
1479 .into_iter()
1480 });
1481
1482 let num_remaining = remaining.len() as u32;
1483
1484 if let Some(target) = remaining.next() {
1485 let info = QueryInfo::Bootstrap {
1486 peer: *target.preimage(),
1487 remaining: Some(remaining),
1488 step: step.next(),
1489 };
1490 let peers = self.kbuckets.closest_keys(&target);
1491 self.queries
1492 .continue_iter_closest(query_id, target, peers, info);
1493 } else {
1494 step.last = true;
1495 self.bootstrap_status.on_finish();
1496 };
1497
1498 Some(Event::OutboundQueryProgressed {
1499 id: query_id,
1500 stats: q.stats,
1501 result: QueryResult::Bootstrap(Ok(BootstrapOk {
1502 peer,
1503 num_remaining,
1504 })),
1505 step,
1506 })
1507 }
1508
1509 QueryInfo::GetClosestPeers { key, mut step, .. } => {
1510 step.last = true;
1511
1512 Some(Event::OutboundQueryProgressed {
1513 id: query_id,
1514 stats: q.stats,
1515 result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk {
1516 key,
1517 peers: q.peers.into_peerinfos_iter().collect(),
1518 })),
1519 step,
1520 })
1521 }
1522
1523 QueryInfo::GetProviders { mut step, .. } => {
1524 step.last = true;
1525
1526 Some(Event::OutboundQueryProgressed {
1527 id: query_id,
1528 stats: q.stats,
1529 result: QueryResult::GetProviders(Ok(
1530 GetProvidersOk::FinishedWithNoAdditionalRecord {
1531 closest_peers: q.peers.into_peerids_iter().collect(),
1532 },
1533 )),
1534 step,
1535 })
1536 }
1537
1538 QueryInfo::AddProvider {
1539 context,
1540 key,
1541 phase: AddProviderPhase::GetClosestPeers,
1542 } => {
1543 let provider_id = self.local_peer_id;
1544 let external_addresses = self.external_addresses.iter().cloned().collect();
1545 let info = QueryInfo::AddProvider {
1546 context,
1547 key,
1548 phase: AddProviderPhase::AddProvider {
1549 provider_id,
1550 external_addresses,
1551 get_closest_peers_stats: q.stats,
1552 },
1553 };
1554 self.queries
1555 .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1556 None
1557 }
1558
1559 QueryInfo::AddProvider {
1560 context,
1561 key,
1562 phase:
1563 AddProviderPhase::AddProvider {
1564 get_closest_peers_stats,
1565 ..
1566 },
1567 } => match context {
1568 AddProviderContext::Publish => Some(Event::OutboundQueryProgressed {
1569 id: query_id,
1570 stats: get_closest_peers_stats.merge(q.stats),
1571 result: QueryResult::StartProviding(Ok(AddProviderOk { key })),
1572 step: ProgressStep::first_and_last(),
1573 }),
1574 AddProviderContext::Republish => Some(Event::OutboundQueryProgressed {
1575 id: query_id,
1576 stats: get_closest_peers_stats.merge(q.stats),
1577 result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })),
1578 step: ProgressStep::first_and_last(),
1579 }),
1580 },
1581
1582 QueryInfo::GetRecord {
1583 key,
1584 mut step,
1585 found_a_record,
1586 cache_candidates,
1587 } => {
1588 step.last = true;
1589
1590 let results = if found_a_record {
1591 Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates })
1592 } else {
1593 Err(GetRecordError::NotFound {
1594 key,
1595 closest_peers: q.peers.into_peerids_iter().collect(),
1596 })
1597 };
1598 Some(Event::OutboundQueryProgressed {
1599 id: query_id,
1600 stats: q.stats,
1601 result: QueryResult::GetRecord(results),
1602 step,
1603 })
1604 }
1605
1606 QueryInfo::PutRecord {
1607 context,
1608 record,
1609 quorum,
1610 phase: PutRecordPhase::GetClosestPeers,
1611 } => {
1612 let info = QueryInfo::PutRecord {
1613 context,
1614 record,
1615 quorum,
1616 phase: PutRecordPhase::PutRecord {
1617 success: vec![],
1618 get_closest_peers_stats: q.stats,
1619 },
1620 };
1621 self.queries
1622 .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1623 None
1624 }
1625
1626 QueryInfo::PutRecord {
1627 context,
1628 record,
1629 quorum,
1630 phase:
1631 PutRecordPhase::PutRecord {
1632 success,
1633 get_closest_peers_stats,
1634 },
1635 } => {
1636 let mk_result = |key: record::Key| {
1637 if success.len() >= quorum.get() {
1638 Ok(PutRecordOk { key })
1639 } else {
1640 Err(PutRecordError::QuorumFailed {
1641 key,
1642 quorum,
1643 success,
1644 })
1645 }
1646 };
1647 match context {
1648 PutRecordContext::Publish | PutRecordContext::Custom => {
1649 Some(Event::OutboundQueryProgressed {
1650 id: query_id,
1651 stats: get_closest_peers_stats.merge(q.stats),
1652 result: QueryResult::PutRecord(mk_result(record.key)),
1653 step: ProgressStep::first_and_last(),
1654 })
1655 }
1656 PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1657 id: query_id,
1658 stats: get_closest_peers_stats.merge(q.stats),
1659 result: QueryResult::RepublishRecord(mk_result(record.key)),
1660 step: ProgressStep::first_and_last(),
1661 }),
1662 PutRecordContext::Replicate => {
1663 tracing::debug!(record=?record.key, "Record replicated");
1664 None
1665 }
1666 }
1667 }
1668 }
1669 }
1670
1671 /// Handles a query that timed out.
1672 fn query_timeout(&mut self, query: Query) -> Option<Event> {
1673 let query_id = query.id();
1674 tracing::trace!(query=?query_id, "Query timed out");
1675 match query.info {
1676 QueryInfo::Bootstrap {
1677 peer,
1678 mut remaining,
1679 mut step,
1680 } => {
1681 let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1682
1683 // Continue with the next bootstrap query if `remaining` is not empty.
1684 if let Some((target, remaining)) =
1685 remaining.take().and_then(|mut r| Some((r.next()?, r)))
1686 {
1687 let info = QueryInfo::Bootstrap {
1688 peer: target.into_preimage(),
1689 remaining: Some(remaining),
1690 step: step.next(),
1691 };
1692 let peers = self.kbuckets.closest_keys(&target);
1693 self.queries
1694 .continue_iter_closest(query_id, target, peers, info);
1695 } else {
1696 step.last = true;
1697 self.bootstrap_status.on_finish();
1698 }
1699
1700 Some(Event::OutboundQueryProgressed {
1701 id: query_id,
1702 stats: query.stats,
1703 result: QueryResult::Bootstrap(Err(BootstrapError::Timeout {
1704 peer,
1705 num_remaining,
1706 })),
1707 step,
1708 })
1709 }
1710
1711 QueryInfo::AddProvider { context, key, .. } => Some(match context {
1712 AddProviderContext::Publish => Event::OutboundQueryProgressed {
1713 id: query_id,
1714 stats: query.stats,
1715 result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })),
1716 step: ProgressStep::first_and_last(),
1717 },
1718 AddProviderContext::Republish => Event::OutboundQueryProgressed {
1719 id: query_id,
1720 stats: query.stats,
1721 result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })),
1722 step: ProgressStep::first_and_last(),
1723 },
1724 }),
1725
1726 QueryInfo::GetClosestPeers { key, mut step, .. } => {
1727 step.last = true;
1728 Some(Event::OutboundQueryProgressed {
1729 id: query_id,
1730 stats: query.stats,
1731 result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout {
1732 key,
1733 peers: query.peers.into_peerinfos_iter().collect(),
1734 })),
1735 step,
1736 })
1737 }
1738
1739 QueryInfo::PutRecord {
1740 record,
1741 quorum,
1742 context,
1743 phase,
1744 } => {
1745 let err = Err(PutRecordError::Timeout {
1746 key: record.key,
1747 quorum,
1748 success: match phase {
1749 PutRecordPhase::GetClosestPeers => vec![],
1750 PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1751 },
1752 });
1753 match context {
1754 PutRecordContext::Publish | PutRecordContext::Custom => {
1755 Some(Event::OutboundQueryProgressed {
1756 id: query_id,
1757 stats: query.stats,
1758 result: QueryResult::PutRecord(err),
1759 step: ProgressStep::first_and_last(),
1760 })
1761 }
1762 PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1763 id: query_id,
1764 stats: query.stats,
1765 result: QueryResult::RepublishRecord(err),
1766 step: ProgressStep::first_and_last(),
1767 }),
1768 PutRecordContext::Replicate => match phase {
1769 PutRecordPhase::GetClosestPeers => {
1770 tracing::warn!(
1771 "Locating closest peers for replication failed: {:?}",
1772 err
1773 );
1774 None
1775 }
1776 PutRecordPhase::PutRecord { .. } => {
1777 tracing::debug!("Replicating record failed: {:?}", err);
1778 None
1779 }
1780 },
1781 }
1782 }
1783
1784 QueryInfo::GetRecord { key, mut step, .. } => {
1785 step.last = true;
1786
1787 Some(Event::OutboundQueryProgressed {
1788 id: query_id,
1789 stats: query.stats,
1790 result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })),
1791 step,
1792 })
1793 }
1794
1795 QueryInfo::GetProviders { key, mut step, .. } => {
1796 step.last = true;
1797
1798 Some(Event::OutboundQueryProgressed {
1799 id: query_id,
1800 stats: query.stats,
1801 result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
1802 key,
1803 closest_peers: query.peers.into_peerids_iter().collect(),
1804 })),
1805 step,
1806 })
1807 }
1808 }
1809 }
1810
1811 /// Processes a record received from a peer.
1812 fn record_received(
1813 &mut self,
1814 source: PeerId,
1815 connection: ConnectionId,
1816 request_id: RequestId,
1817 mut record: Record,
1818 ) {
1819 if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1820 // If the (alleged) publisher is the local node, do nothing. The record of
1821 // the original publisher should never change as a result of replication
1822 // and the publisher is always assumed to have the "right" value.
1823 self.queued_events.push_back(ToSwarm::NotifyHandler {
1824 peer_id: source,
1825 handler: NotifyHandler::One(connection),
1826 event: HandlerIn::PutRecordRes {
1827 key: record.key,
1828 value: record.value,
1829 request_id,
1830 },
1831 });
1832 return;
1833 }
1834
1835 let now = Instant::now();
1836
1837 // Calculate the expiration exponentially inversely proportional to the
1838 // number of nodes between the local node and the closest node to the key
1839 // (beyond the replication factor). This ensures avoiding over-caching
1840 // outside of the k closest nodes to a key.
1841 let target = kbucket::Key::new(record.key.clone());
1842 let num_between = self.kbuckets.count_nodes_between(&target);
1843 let k = self.queries.config().replication_factor.get();
1844 let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1845 let expiration = self
1846 .record_ttl
1847 .map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1848 // The smaller TTL prevails. Only if neither TTL is set is the record
1849 // stored "forever".
1850 record.expires = record.expires.or(expiration).min(expiration);
1851
1852 if let Some(job) = self.put_record_job.as_mut() {
1853 // Ignore the record in the next run of the replication
1854 // job, since we can assume the sender replicated the
1855 // record to the k closest peers. Effectively, only
1856 // one of the k closest peers performs a replication
1857 // in the configured interval, assuming a shared interval.
1858 job.skip(record.key.clone())
1859 }
1860
1861 // While records received from a publisher, as well as records that do
1862 // not exist locally should always (attempted to) be stored, there is a
1863 // choice here w.r.t. the handling of replicated records whose keys refer
1864 // to records that exist locally: The value and / or the publisher may
1865 // either be overridden or left unchanged. At the moment and in the
1866 // absence of a decisive argument for another option, both are always
1867 // overridden as it avoids having to load the existing record in the
1868 // first place.
1869
1870 if !record.is_expired(now) {
1871 // The record is cloned because of the weird libp2p protocol
1872 // requirement to send back the value in the response, although this
1873 // is a waste of resources.
1874 match self.record_filtering {
1875 StoreInserts::Unfiltered => match self.store.put(record.clone()) {
1876 Ok(()) => {
1877 tracing::debug!(
1878 record=?record.key,
1879 "Record stored: {} bytes",
1880 record.value.len()
1881 );
1882 self.queued_events.push_back(ToSwarm::GenerateEvent(
1883 Event::InboundRequest {
1884 request: InboundRequest::PutRecord {
1885 source,
1886 connection,
1887 record: None,
1888 },
1889 },
1890 ));
1891 }
1892 Err(e) => {
1893 tracing::info!("Record not stored: {:?}", e);
1894 self.queued_events.push_back(ToSwarm::NotifyHandler {
1895 peer_id: source,
1896 handler: NotifyHandler::One(connection),
1897 event: HandlerIn::Reset(request_id),
1898 });
1899
1900 return;
1901 }
1902 },
1903 StoreInserts::FilterBoth => {
1904 self.queued_events
1905 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1906 request: InboundRequest::PutRecord {
1907 source,
1908 connection,
1909 record: Some(record.clone()),
1910 },
1911 }));
1912 }
1913 }
1914 }
1915
1916 // The remote receives a [`HandlerIn::PutRecordRes`] even in the
1917 // case where the record is discarded due to being expired. Given that
1918 // the remote sent the local node a [`HandlerEvent::PutRecord`]
1919 // request, the remote perceives the local node as one node among the k
1920 // closest nodes to the target. In addition returning
1921 // [`HandlerIn::PutRecordRes`] does not reveal any internal
1922 // information to a possibly malicious remote node.
1923 self.queued_events.push_back(ToSwarm::NotifyHandler {
1924 peer_id: source,
1925 handler: NotifyHandler::One(connection),
1926 event: HandlerIn::PutRecordRes {
1927 key: record.key,
1928 value: record.value,
1929 request_id,
1930 },
1931 })
1932 }
1933
1934 /// Processes a provider record received from a peer.
1935 fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
1936 if &provider.node_id != self.kbuckets.local_key().preimage() {
1937 let record = ProviderRecord {
1938 key,
1939 provider: provider.node_id,
1940 expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1941 addresses: provider.multiaddrs,
1942 };
1943 match self.record_filtering {
1944 StoreInserts::Unfiltered => {
1945 if let Err(e) = self.store.add_provider(record) {
1946 tracing::info!("Provider record not stored: {:?}", e);
1947 return;
1948 }
1949
1950 self.queued_events
1951 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1952 request: InboundRequest::AddProvider { record: None },
1953 }));
1954 }
1955 StoreInserts::FilterBoth => {
1956 self.queued_events
1957 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1958 request: InboundRequest::AddProvider {
1959 record: Some(record),
1960 },
1961 }));
1962 }
1963 }
1964 }
1965 }
1966
1967 fn address_failed(&mut self, peer_id: PeerId, address: &Multiaddr) {
1968 let key = kbucket::Key::from(peer_id);
1969
1970 if let Some(addrs) = self.kbuckets.entry(&key).as_mut().and_then(|e| e.value()) {
1971 // TODO: Ideally, the address should only be removed if the error can
1972 // be classified as "permanent" but since `err` is currently a borrowed
1973 // trait object without a `'static` bound, even downcasting for inspection
1974 // of the error is not possible (and also not truly desirable or ergonomic).
1975 // The error passed in should rather be a dedicated enum.
1976 if addrs.remove(address).is_ok() {
1977 tracing::debug!(
1978 peer=%peer_id,
1979 %address,
1980 "Address removed from peer due to error."
1981 );
1982 } else {
1983 // Despite apparently having no reachable address (any longer),
1984 // the peer is kept in the routing table with the last address to avoid
1985 // (temporary) loss of network connectivity to "flush" the routing
1986 // table. Once in, a peer is only removed from the routing table
1987 // if it is the least recently connected peer, currently disconnected
1988 // and is unreachable in the context of another peer pending insertion
1989 // into the same bucket. This is handled transparently by the
1990 // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
1991 // within `Behaviour::poll`.
1992 tracing::debug!(
1993 peer=%peer_id,
1994 %address,
1995 "Last remaining address of peer is unreachable."
1996 );
1997 }
1998 }
1999
2000 for query in self.queries.iter_mut() {
2001 if let Some(addrs) = query.peers.addresses.get_mut(&peer_id) {
2002 addrs.retain(|a| a != address);
2003 }
2004 }
2005 }
2006
2007 fn on_connection_established(
2008 &mut self,
2009 ConnectionEstablished {
2010 peer_id,
2011 failed_addresses,
2012 other_established,
2013 ..
2014 }: ConnectionEstablished,
2015 ) {
2016 for addr in failed_addresses {
2017 self.address_failed(peer_id, addr);
2018 }
2019
2020 // Peer's first connection.
2021 if other_established == 0 {
2022 self.connected_peers.insert(peer_id);
2023 }
2024 }
2025
2026 fn on_address_change(
2027 &mut self,
2028 AddressChange {
2029 peer_id: peer,
2030 old,
2031 new,
2032 ..
2033 }: AddressChange,
2034 ) {
2035 let (old, new) = (old.get_remote_address(), new.get_remote_address());
2036
2037 // Update routing table.
2038 if let Some(addrs) = self
2039 .kbuckets
2040 .entry(&kbucket::Key::from(peer))
2041 .as_mut()
2042 .and_then(|e| e.value())
2043 {
2044 if addrs.replace(old, new) {
2045 tracing::debug!(
2046 %peer,
2047 old_address=%old,
2048 new_address=%new,
2049 "Old address replaced with new address for peer."
2050 );
2051 } else {
2052 tracing::debug!(
2053 %peer,
2054 old_address=%old,
2055 new_address=%new,
2056 "Old address not replaced with new address for peer as old address wasn't present.",
2057 );
2058 }
2059 } else {
2060 tracing::debug!(
2061 %peer,
2062 old_address=%old,
2063 new_address=%new,
2064 "Old address not replaced with new address for peer as peer is not present in the \
2065 routing table."
2066 );
2067 }
2068
2069 // Update query address cache.
2070 //
2071 // Given two connected nodes: local node A and remote node B. Say node B
2072 // is not in node A's routing table. Additionally node B is part of the
2073 // `Query::addresses` list of an ongoing query on node A. Say Node
2074 // B triggers an address change and then disconnects. Later on the
2075 // earlier mentioned query on node A would like to connect to node B.
2076 // Without replacing the address in the `Query::addresses` set node
2077 // A would attempt to dial the old and not the new address.
2078 //
2079 // While upholding correctness, iterating through all discovered
2080 // addresses of a peer in all currently ongoing queries might have a
2081 // large performance impact. If so, the code below might be worth
2082 // revisiting.
2083 for query in self.queries.iter_mut() {
2084 if let Some(addrs) = query.peers.addresses.get_mut(&peer) {
2085 for addr in addrs.iter_mut() {
2086 if addr == old {
2087 *addr = new.clone();
2088 }
2089 }
2090 }
2091 }
2092 }
2093
2094 fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) {
2095 let Some(peer_id) = peer_id else { return };
2096
2097 match error {
2098 DialError::LocalPeerId { .. }
2099 | DialError::WrongPeerId { .. }
2100 | DialError::Aborted
2101 | DialError::Denied { .. }
2102 | DialError::Transport(_)
2103 | DialError::NoAddresses => {
2104 if let DialError::Transport(addresses) = error {
2105 for (addr, _) in addresses {
2106 self.address_failed(peer_id, addr)
2107 }
2108 }
2109
2110 for query in self.queries.iter_mut() {
2111 query.on_failure(&peer_id);
2112 }
2113 }
2114 DialError::DialPeerConditionFalse(
2115 dial_opts::PeerCondition::Disconnected
2116 | dial_opts::PeerCondition::NotDialing
2117 | dial_opts::PeerCondition::DisconnectedAndNotDialing,
2118 ) => {
2119 // We might (still) be connected, or about to be connected, thus do not report the
2120 // failure to the queries.
2121 }
2122 DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
2123 unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
2124 }
2125 }
2126 }
2127
2128 fn on_connection_closed(
2129 &mut self,
2130 ConnectionClosed {
2131 peer_id,
2132 remaining_established,
2133 connection_id,
2134 ..
2135 }: ConnectionClosed,
2136 ) {
2137 self.connections.remove(&connection_id);
2138
2139 if remaining_established == 0 {
2140 for query in self.queries.iter_mut() {
2141 query.on_failure(&peer_id);
2142 }
2143 self.connection_updated(peer_id, None, NodeStatus::Disconnected);
2144 self.connected_peers.remove(&peer_id);
2145 }
2146 }
2147
2148 /// Preloads a new [`Handler`] with requests that are waiting
2149 /// to be sent to the newly connected peer.
2150 fn preload_new_handler(
2151 &mut self,
2152 handler: &mut Handler,
2153 connection_id: ConnectionId,
2154 peer: PeerId,
2155 ) {
2156 self.connections.insert(connection_id, peer);
2157 // Queue events for sending pending RPCs to the connected peer.
2158 // There can be only one pending RPC for a particular peer and query per definition.
2159 for (_peer_id, event) in self.queries.iter_mut().filter_map(|q| {
2160 q.pending_rpcs
2161 .iter()
2162 .position(|(p, _)| p == &peer)
2163 .map(|p| q.pending_rpcs.remove(p))
2164 }) {
2165 handler.on_behaviour_event(event)
2166 }
2167 }
2168}
2169
2170/// Exponentially decrease the given duration (base 2).
2171fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
2172 Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
2173}
2174
2175impl<TStore> NetworkBehaviour for Behaviour<TStore>
2176where
2177 TStore: RecordStore + Send + 'static,
2178{
2179 type ConnectionHandler = Handler;
2180 type ToSwarm = Event;
2181
2182 fn handle_established_inbound_connection(
2183 &mut self,
2184 connection_id: ConnectionId,
2185 peer: PeerId,
2186 local_addr: &Multiaddr,
2187 remote_addr: &Multiaddr,
2188 ) -> Result<THandler<Self>, ConnectionDenied> {
2189 let connected_point = ConnectedPoint::Listener {
2190 local_addr: local_addr.clone(),
2191 send_back_addr: remote_addr.clone(),
2192 };
2193
2194 let mut handler = Handler::new(
2195 self.protocol_config.clone(),
2196 connected_point,
2197 peer,
2198 self.mode,
2199 );
2200 self.preload_new_handler(&mut handler, connection_id, peer);
2201
2202 Ok(handler)
2203 }
2204
2205 fn handle_established_outbound_connection(
2206 &mut self,
2207 connection_id: ConnectionId,
2208 peer: PeerId,
2209 addr: &Multiaddr,
2210 role_override: Endpoint,
2211 port_use: PortUse,
2212 ) -> Result<THandler<Self>, ConnectionDenied> {
2213 let connected_point = ConnectedPoint::Dialer {
2214 address: addr.clone(),
2215 role_override,
2216 port_use,
2217 };
2218
2219 let mut handler = Handler::new(
2220 self.protocol_config.clone(),
2221 connected_point,
2222 peer,
2223 self.mode,
2224 );
2225 self.preload_new_handler(&mut handler, connection_id, peer);
2226
2227 Ok(handler)
2228 }
2229
2230 fn handle_pending_outbound_connection(
2231 &mut self,
2232 _connection_id: ConnectionId,
2233 maybe_peer: Option<PeerId>,
2234 _addresses: &[Multiaddr],
2235 _effective_role: Endpoint,
2236 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
2237 let peer_id = match maybe_peer {
2238 None => return Ok(vec![]),
2239 Some(peer) => peer,
2240 };
2241
2242 // We should order addresses from decreasing likelihood of connectivity, so start with
2243 // the addresses of that peer in the k-buckets.
2244 let key = kbucket::Key::from(peer_id);
2245 let mut peer_addrs =
2246 if let Some(kbucket::Entry::Present(mut entry, _)) = self.kbuckets.entry(&key) {
2247 let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
2248 debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
2249 addrs
2250 } else {
2251 Vec::new()
2252 };
2253
2254 // We add to that a temporary list of addresses from the ongoing queries.
2255 for query in self.queries.iter() {
2256 if let Some(addrs) = query.peers.addresses.get(&peer_id) {
2257 peer_addrs.extend(addrs.iter().cloned())
2258 }
2259 }
2260
2261 Ok(peer_addrs)
2262 }
2263
2264 fn on_connection_handler_event(
2265 &mut self,
2266 source: PeerId,
2267 connection: ConnectionId,
2268 event: THandlerOutEvent<Self>,
2269 ) {
2270 match event {
2271 HandlerEvent::ProtocolConfirmed { endpoint } => {
2272 debug_assert!(self.connected_peers.contains(&source));
2273 // The remote's address can only be put into the routing table,
2274 // and thus shared with other nodes, if the local node is the dialer,
2275 // since the remote address on an inbound connection may be specific
2276 // to that connection (e.g. typically the TCP port numbers).
2277 let address = match endpoint {
2278 ConnectedPoint::Dialer { address, .. } => Some(address),
2279 ConnectedPoint::Listener { .. } => None,
2280 };
2281
2282 self.connection_updated(source, address, NodeStatus::Connected);
2283 }
2284
2285 HandlerEvent::ProtocolNotSupported { endpoint } => {
2286 let address = match endpoint {
2287 ConnectedPoint::Dialer { address, .. } => Some(address),
2288 ConnectedPoint::Listener { .. } => None,
2289 };
2290 self.connection_updated(source, address, NodeStatus::Disconnected);
2291 }
2292
2293 HandlerEvent::FindNodeReq { key, request_id } => {
2294 let closer_peers = self
2295 .find_closest_local_peers(&kbucket::Key::new(key), &source)
2296 .collect::<Vec<_>>();
2297
2298 self.queued_events
2299 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2300 request: InboundRequest::FindNode {
2301 num_closer_peers: closer_peers.len(),
2302 },
2303 }));
2304
2305 self.queued_events.push_back(ToSwarm::NotifyHandler {
2306 peer_id: source,
2307 handler: NotifyHandler::One(connection),
2308 event: HandlerIn::FindNodeRes {
2309 closer_peers,
2310 request_id,
2311 },
2312 });
2313 }
2314
2315 HandlerEvent::FindNodeRes {
2316 closer_peers,
2317 query_id,
2318 } => {
2319 self.discovered(&query_id, &source, closer_peers.iter());
2320 }
2321
2322 HandlerEvent::GetProvidersReq { key, request_id } => {
2323 let provider_peers = self.provider_peers(&key, &source);
2324 let closer_peers = self
2325 .find_closest_local_peers(&kbucket::Key::new(key), &source)
2326 .collect::<Vec<_>>();
2327
2328 self.queued_events
2329 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2330 request: InboundRequest::GetProvider {
2331 num_closer_peers: closer_peers.len(),
2332 num_provider_peers: provider_peers.len(),
2333 },
2334 }));
2335
2336 self.queued_events.push_back(ToSwarm::NotifyHandler {
2337 peer_id: source,
2338 handler: NotifyHandler::One(connection),
2339 event: HandlerIn::GetProvidersRes {
2340 closer_peers,
2341 provider_peers,
2342 request_id,
2343 },
2344 });
2345 }
2346
2347 HandlerEvent::GetProvidersRes {
2348 closer_peers,
2349 provider_peers,
2350 query_id,
2351 } => {
2352 let peers = closer_peers.iter().chain(provider_peers.iter());
2353 self.discovered(&query_id, &source, peers);
2354 if let Some(query) = self.queries.get_mut(&query_id) {
2355 let stats = query.stats().clone();
2356 if let QueryInfo::GetProviders {
2357 ref key,
2358 ref mut providers_found,
2359 ref mut step,
2360 ..
2361 } = query.info
2362 {
2363 *providers_found += provider_peers.len();
2364 let providers = provider_peers.iter().map(|p| p.node_id).collect();
2365
2366 self.queued_events.push_back(ToSwarm::GenerateEvent(
2367 Event::OutboundQueryProgressed {
2368 id: query_id,
2369 result: QueryResult::GetProviders(Ok(
2370 GetProvidersOk::FoundProviders {
2371 key: key.clone(),
2372 providers,
2373 },
2374 )),
2375 step: step.clone(),
2376 stats,
2377 },
2378 ));
2379 *step = step.next();
2380 }
2381 }
2382 }
2383 HandlerEvent::QueryError { query_id, error } => {
2384 tracing::debug!(
2385 peer=%source,
2386 query=?query_id,
2387 "Request to peer in query failed with {:?}",
2388 error
2389 );
2390 // If the query to which the error relates is still active,
2391 // signal the failure w.r.t. `source`.
2392 if let Some(query) = self.queries.get_mut(&query_id) {
2393 query.on_failure(&source)
2394 }
2395 }
2396
2397 HandlerEvent::AddProvider { key, provider } => {
2398 // Only accept a provider record from a legitimate peer.
2399 if provider.node_id != source {
2400 return;
2401 }
2402
2403 self.provider_received(key, provider);
2404 }
2405
2406 HandlerEvent::GetRecord { key, request_id } => {
2407 // Lookup the record locally.
2408 let record = match self.store.get(&key) {
2409 Some(record) => {
2410 if record.is_expired(Instant::now()) {
2411 self.store.remove(&key);
2412 None
2413 } else {
2414 Some(record.into_owned())
2415 }
2416 }
2417 None => None,
2418 };
2419
2420 let closer_peers = self
2421 .find_closest_local_peers(&kbucket::Key::new(key), &source)
2422 .collect::<Vec<_>>();
2423
2424 self.queued_events
2425 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2426 request: InboundRequest::GetRecord {
2427 num_closer_peers: closer_peers.len(),
2428 present_locally: record.is_some(),
2429 },
2430 }));
2431
2432 self.queued_events.push_back(ToSwarm::NotifyHandler {
2433 peer_id: source,
2434 handler: NotifyHandler::One(connection),
2435 event: HandlerIn::GetRecordRes {
2436 record,
2437 closer_peers,
2438 request_id,
2439 },
2440 });
2441 }
2442
2443 HandlerEvent::GetRecordRes {
2444 record,
2445 closer_peers,
2446 query_id,
2447 } => {
2448 if let Some(query) = self.queries.get_mut(&query_id) {
2449 let stats = query.stats().clone();
2450 if let QueryInfo::GetRecord {
2451 key,
2452 ref mut step,
2453 ref mut found_a_record,
2454 cache_candidates,
2455 } = &mut query.info
2456 {
2457 if let Some(record) = record {
2458 *found_a_record = true;
2459 let record = PeerRecord {
2460 peer: Some(source),
2461 record,
2462 };
2463
2464 self.queued_events.push_back(ToSwarm::GenerateEvent(
2465 Event::OutboundQueryProgressed {
2466 id: query_id,
2467 result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(
2468 record,
2469 ))),
2470 step: step.clone(),
2471 stats,
2472 },
2473 ));
2474
2475 *step = step.next();
2476 } else {
2477 tracing::trace!(record=?key, %source, "Record not found at source");
2478 if let Caching::Enabled { max_peers } = self.caching {
2479 let source_key = kbucket::Key::from(source);
2480 let target_key = kbucket::Key::from(key.clone());
2481 let distance = source_key.distance(&target_key);
2482 cache_candidates.insert(distance, source);
2483 if cache_candidates.len() > max_peers as usize {
2484 // TODO: `pop_last()` would be nice once stabilised.
2485 // See https://github.com/rust-lang/rust/issues/62924.
2486 let last =
2487 *cache_candidates.keys().next_back().expect("len > 0");
2488 cache_candidates.remove(&last);
2489 }
2490 }
2491 }
2492 }
2493 }
2494
2495 self.discovered(&query_id, &source, closer_peers.iter());
2496 }
2497
2498 HandlerEvent::PutRecord { record, request_id } => {
2499 self.record_received(source, connection, request_id, record);
2500 }
2501
2502 HandlerEvent::PutRecordRes { query_id, .. } => {
2503 if let Some(query) = self.queries.get_mut(&query_id) {
2504 query.on_success(&source, vec![]);
2505 if let QueryInfo::PutRecord {
2506 phase: PutRecordPhase::PutRecord { success, .. },
2507 quorum,
2508 ..
2509 } = &mut query.info
2510 {
2511 success.push(source);
2512
2513 let quorum = quorum.get();
2514 if success.len() >= quorum {
2515 let peers = success.clone();
2516 let finished = query.try_finish(peers.iter());
2517 if !finished {
2518 tracing::debug!(
2519 peer=%source,
2520 query=?query_id,
2521 "PutRecord query reached quorum ({}/{}) with response \
2522 from peer but could not yet finish.",
2523 peers.len(),
2524 quorum,
2525 );
2526 }
2527 }
2528 }
2529 }
2530 }
2531 };
2532 }
2533
2534 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
2535 fn poll(
2536 &mut self,
2537 cx: &mut Context<'_>,
2538 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2539 let now = Instant::now();
2540
2541 // Calculate the available capacity for queries triggered by background jobs.
2542 let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
2543
2544 // Run the periodic provider announcement job.
2545 if let Some(mut job) = self.add_provider_job.take() {
2546 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2547 for i in 0..num {
2548 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2549 self.start_add_provider(r.key, AddProviderContext::Republish)
2550 } else {
2551 jobs_query_capacity -= i;
2552 break;
2553 }
2554 }
2555 self.add_provider_job = Some(job);
2556 }
2557
2558 // Run the periodic record replication / publication job.
2559 if let Some(mut job) = self.put_record_job.take() {
2560 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2561 for _ in 0..num {
2562 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2563 let context =
2564 if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
2565 PutRecordContext::Republish
2566 } else {
2567 PutRecordContext::Replicate
2568 };
2569 self.start_put_record(r, Quorum::All, context)
2570 } else {
2571 break;
2572 }
2573 }
2574 self.put_record_job = Some(job);
2575 }
2576
2577 // Poll bootstrap periodically and automatically.
2578 if let Poll::Ready(()) = self.bootstrap_status.poll_next_bootstrap(cx) {
2579 if let Err(e) = self.bootstrap() {
2580 tracing::warn!("Failed to trigger bootstrap: {e}");
2581 }
2582 }
2583
2584 loop {
2585 // Drain queued events first.
2586 if let Some(event) = self.queued_events.pop_front() {
2587 return Poll::Ready(event);
2588 }
2589
2590 // Drain applied pending entries from the routing table.
2591 if let Some(entry) = self.kbuckets.take_applied_pending() {
2592 let kbucket::Node { key, value } = entry.inserted;
2593 let peer_id = key.into_preimage();
2594 self.queued_events
2595 .push_back(ToSwarm::NewExternalAddrOfPeer {
2596 peer_id,
2597 address: value.first().clone(),
2598 });
2599 let event = Event::RoutingUpdated {
2600 bucket_range: self
2601 .kbuckets
2602 .bucket(&key)
2603 .map(|b| b.range())
2604 .expect("Self to never be applied from pending."),
2605 peer: peer_id,
2606 is_new_peer: true,
2607 addresses: value,
2608 old_peer: entry.evicted.map(|n| n.key.into_preimage()),
2609 };
2610 return Poll::Ready(ToSwarm::GenerateEvent(event));
2611 }
2612
2613 // Look for a finished query.
2614 loop {
2615 match self.queries.poll(now) {
2616 QueryPoolState::Finished(q) => {
2617 if let Some(event) = self.query_finished(q) {
2618 return Poll::Ready(ToSwarm::GenerateEvent(event));
2619 }
2620 }
2621 QueryPoolState::Timeout(q) => {
2622 if let Some(event) = self.query_timeout(q) {
2623 return Poll::Ready(ToSwarm::GenerateEvent(event));
2624 }
2625 }
2626 QueryPoolState::Waiting(Some((query, peer_id))) => {
2627 let event = query.info.to_request(query.id());
2628 // TODO: AddProvider requests yield no response, so the query completes
2629 // as soon as all requests have been sent. However, the handler should
2630 // better emit an event when the request has been sent (and report
2631 // an error if sending fails), instead of immediately reporting
2632 // "success" somewhat prematurely here.
2633 if let QueryInfo::AddProvider {
2634 phase: AddProviderPhase::AddProvider { .. },
2635 ..
2636 } = &query.info
2637 {
2638 query.on_success(&peer_id, vec![])
2639 }
2640
2641 if self.connected_peers.contains(&peer_id) {
2642 self.queued_events.push_back(ToSwarm::NotifyHandler {
2643 peer_id,
2644 event,
2645 handler: NotifyHandler::Any,
2646 });
2647 } else if &peer_id != self.kbuckets.local_key().preimage() {
2648 query.pending_rpcs.push((peer_id, event));
2649 self.queued_events.push_back(ToSwarm::Dial {
2650 opts: DialOpts::peer_id(peer_id).build(),
2651 });
2652 }
2653 }
2654 QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
2655 }
2656 }
2657
2658 // No immediate event was produced as a result of a finished query.
2659 // If no new events have been queued either, signal `NotReady` to
2660 // be polled again later.
2661 if self.queued_events.is_empty() {
2662 self.no_events_waker = Some(cx.waker().clone());
2663
2664 return Poll::Pending;
2665 }
2666 }
2667 }
2668
2669 fn on_swarm_event(&mut self, event: FromSwarm) {
2670 self.listen_addresses.on_swarm_event(&event);
2671 let external_addresses_changed = self.external_addresses.on_swarm_event(&event);
2672
2673 if self.auto_mode && external_addresses_changed {
2674 self.determine_mode_from_external_addresses();
2675 }
2676
2677 match event {
2678 FromSwarm::ConnectionEstablished(connection_established) => {
2679 self.on_connection_established(connection_established)
2680 }
2681 FromSwarm::ConnectionClosed(connection_closed) => {
2682 self.on_connection_closed(connection_closed)
2683 }
2684 FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
2685 FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
2686 FromSwarm::NewListenAddr(_) if self.connected_peers.is_empty() => {
2687 // A new listen addr was just discovered and we have no connected peers,
2688 // it can mean that our network interfaces were not up but they are now
2689 // so it might be a good idea to trigger a bootstrap.
2690 self.bootstrap_status.trigger();
2691 }
2692 _ => {}
2693 }
2694 }
2695}
2696
2697/// Peer Info combines a Peer ID with a set of multiaddrs that the peer is listening on.
2698#[derive(Debug, Clone, PartialEq, Eq)]
2699pub struct PeerInfo {
2700 pub peer_id: PeerId,
2701 pub addrs: Vec<Multiaddr>,
2702}
2703
2704/// A quorum w.r.t. the configured replication factor specifies the minimum
2705/// number of distinct nodes that must be successfully contacted in order
2706/// for a query to succeed.
2707#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2708pub enum Quorum {
2709 One,
2710 Majority,
2711 All,
2712 N(NonZeroUsize),
2713}
2714
2715impl Quorum {
2716 /// Evaluate the quorum w.r.t a given total (number of peers).
2717 fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
2718 match self {
2719 Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
2720 Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
2721 Quorum::All => total,
2722 Quorum::N(n) => NonZeroUsize::min(total, *n),
2723 }
2724 }
2725}
2726
2727/// A record either received by the given peer or retrieved from the local
2728/// record store.
2729#[derive(Debug, Clone, PartialEq, Eq)]
2730pub struct PeerRecord {
2731 /// The peer from whom the record was received. `None` if the record was
2732 /// retrieved from local storage.
2733 pub peer: Option<PeerId>,
2734 pub record: Record,
2735}
2736
2737//////////////////////////////////////////////////////////////////////////////
2738// Events
2739
2740/// The events produced by the `Kademlia` behaviour.
2741///
2742/// See [`NetworkBehaviour::poll`].
2743#[derive(Debug, Clone)]
2744#[allow(clippy::large_enum_variant)]
2745pub enum Event {
2746 /// An inbound request has been received and handled.
2747 // Note on the difference between 'request' and 'query': A request is a
2748 // single request-response style exchange with a single remote peer. A query
2749 // is made of multiple requests across multiple remote peers.
2750 InboundRequest { request: InboundRequest },
2751
2752 /// An outbound query has made progress.
2753 OutboundQueryProgressed {
2754 /// The ID of the query that finished.
2755 id: QueryId,
2756 /// The intermediate result of the query.
2757 result: QueryResult,
2758 /// Execution statistics from the query.
2759 stats: QueryStats,
2760 /// Indicates which event this is, if therer are multiple responses for a single query.
2761 step: ProgressStep,
2762 },
2763
2764 /// The routing table has been updated with a new peer and / or
2765 /// address, thereby possibly evicting another peer.
2766 RoutingUpdated {
2767 /// The ID of the peer that was added or updated.
2768 peer: PeerId,
2769 /// Whether this is a new peer and was thus just added to the routing
2770 /// table, or whether it is an existing peer who's addresses changed.
2771 is_new_peer: bool,
2772 /// The full list of known addresses of `peer`.
2773 addresses: Addresses,
2774 /// Returns the minimum inclusive and maximum inclusive distance for
2775 /// the bucket of the peer.
2776 bucket_range: (Distance, Distance),
2777 /// The ID of the peer that was evicted from the routing table to make
2778 /// room for the new peer, if any.
2779 old_peer: Option<PeerId>,
2780 },
2781
2782 /// A peer has connected for whom no listen address is known.
2783 ///
2784 /// If the peer is to be added to the routing table, a known
2785 /// listen address for the peer must be provided via [`Behaviour::add_address`].
2786 UnroutablePeer { peer: PeerId },
2787
2788 /// A connection to a peer has been established for whom a listen address
2789 /// is known but the peer has not been added to the routing table either
2790 /// because [`BucketInserts::Manual`] is configured or because
2791 /// the corresponding bucket is full.
2792 ///
2793 /// If the peer is to be included in the routing table, it must
2794 /// must be explicitly added via [`Behaviour::add_address`], possibly after
2795 /// removing another peer.
2796 ///
2797 /// See [`Behaviour::kbucket`] for insight into the contents of
2798 /// the k-bucket of `peer`.
2799 RoutablePeer { peer: PeerId, address: Multiaddr },
2800
2801 /// A connection to a peer has been established for whom a listen address
2802 /// is known but the peer is only pending insertion into the routing table
2803 /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2804 /// may not make it into the routing table.
2805 ///
2806 /// If the peer is to be unconditionally included in the routing table,
2807 /// it should be explicitly added via [`Behaviour::add_address`] after
2808 /// removing another peer.
2809 ///
2810 /// See [`Behaviour::kbucket`] for insight into the contents of
2811 /// the k-bucket of `peer`.
2812 PendingRoutablePeer { peer: PeerId, address: Multiaddr },
2813
2814 /// This peer's mode has been updated automatically.
2815 ///
2816 /// This happens in response to an external
2817 /// address being added or removed.
2818 ModeChanged { new_mode: Mode },
2819}
2820
2821/// Information about progress events.
2822#[derive(Debug, Clone)]
2823pub struct ProgressStep {
2824 /// The index into the event
2825 pub count: NonZeroUsize,
2826 /// Is this the final event?
2827 pub last: bool,
2828}
2829
2830impl ProgressStep {
2831 fn first() -> Self {
2832 Self {
2833 count: NonZeroUsize::new(1).expect("1 to be greater than 0."),
2834 last: false,
2835 }
2836 }
2837
2838 fn first_and_last() -> Self {
2839 let mut first = ProgressStep::first();
2840 first.last = true;
2841 first
2842 }
2843
2844 fn next(&self) -> Self {
2845 assert!(!self.last);
2846 let count = NonZeroUsize::new(self.count.get() + 1).expect("Adding 1 not to result in 0.");
2847
2848 Self { count, last: false }
2849 }
2850}
2851
2852/// Information about a received and handled inbound request.
2853#[derive(Debug, Clone)]
2854pub enum InboundRequest {
2855 /// Request for the list of nodes whose IDs are the closest to `key`.
2856 FindNode { num_closer_peers: usize },
2857 /// Same as `FindNode`, but should also return the entries of the local
2858 /// providers list for this key.
2859 GetProvider {
2860 num_closer_peers: usize,
2861 num_provider_peers: usize,
2862 },
2863 /// A peer sent an add provider request.
2864 /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is
2865 /// included.
2866 ///
2867 /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details..
2868 AddProvider { record: Option<ProviderRecord> },
2869 /// Request to retrieve a record.
2870 GetRecord {
2871 num_closer_peers: usize,
2872 present_locally: bool,
2873 },
2874 /// A peer sent a put record request.
2875 /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`Record`] is included.
2876 ///
2877 /// See [`StoreInserts`] and [`Config::set_record_filtering`].
2878 PutRecord {
2879 source: PeerId,
2880 connection: ConnectionId,
2881 record: Option<Record>,
2882 },
2883}
2884
2885/// The results of Kademlia queries.
2886#[derive(Debug, Clone)]
2887pub enum QueryResult {
2888 /// The result of [`Behaviour::bootstrap`].
2889 Bootstrap(BootstrapResult),
2890
2891 /// The result of [`Behaviour::get_closest_peers`].
2892 GetClosestPeers(GetClosestPeersResult),
2893
2894 /// The result of [`Behaviour::get_providers`].
2895 GetProviders(GetProvidersResult),
2896
2897 /// The result of [`Behaviour::start_providing`].
2898 StartProviding(AddProviderResult),
2899
2900 /// The result of a (automatic) republishing of a provider record.
2901 RepublishProvider(AddProviderResult),
2902
2903 /// The result of [`Behaviour::get_record`].
2904 GetRecord(GetRecordResult),
2905
2906 /// The result of [`Behaviour::put_record`].
2907 PutRecord(PutRecordResult),
2908
2909 /// The result of a (automatic) republishing of a (value-)record.
2910 RepublishRecord(PutRecordResult),
2911}
2912
2913/// The result of [`Behaviour::get_record`].
2914pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2915
2916/// The successful result of [`Behaviour::get_record`].
2917#[derive(Debug, Clone)]
2918pub enum GetRecordOk {
2919 FoundRecord(PeerRecord),
2920 FinishedWithNoAdditionalRecord {
2921 /// If caching is enabled, these are the peers closest
2922 /// _to the record key_ (not the local node) that were queried but
2923 /// did not return the record, sorted by distance to the record key
2924 /// from closest to farthest. How many of these are tracked is configured
2925 /// by [`Config::set_caching`].
2926 ///
2927 /// Writing back the cache at these peers is a manual operation.
2928 /// ie. you may wish to use these candidates with [`Behaviour::put_record_to`]
2929 /// after selecting one of the returned records.
2930 cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
2931 },
2932}
2933
2934/// The error result of [`Behaviour::get_record`].
2935#[derive(Debug, Clone, Error)]
2936pub enum GetRecordError {
2937 #[error("the record was not found")]
2938 NotFound {
2939 key: record::Key,
2940 closest_peers: Vec<PeerId>,
2941 },
2942 #[error("the quorum failed; needed {quorum} peers")]
2943 QuorumFailed {
2944 key: record::Key,
2945 records: Vec<PeerRecord>,
2946 quorum: NonZeroUsize,
2947 },
2948 #[error("the request timed out")]
2949 Timeout { key: record::Key },
2950}
2951
2952impl GetRecordError {
2953 /// Gets the key of the record for which the operation failed.
2954 pub fn key(&self) -> &record::Key {
2955 match self {
2956 GetRecordError::QuorumFailed { key, .. } => key,
2957 GetRecordError::Timeout { key, .. } => key,
2958 GetRecordError::NotFound { key, .. } => key,
2959 }
2960 }
2961
2962 /// Extracts the key of the record for which the operation failed,
2963 /// consuming the error.
2964 pub fn into_key(self) -> record::Key {
2965 match self {
2966 GetRecordError::QuorumFailed { key, .. } => key,
2967 GetRecordError::Timeout { key, .. } => key,
2968 GetRecordError::NotFound { key, .. } => key,
2969 }
2970 }
2971}
2972
2973/// The result of [`Behaviour::put_record`].
2974pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2975
2976/// The successful result of [`Behaviour::put_record`].
2977#[derive(Debug, Clone)]
2978pub struct PutRecordOk {
2979 pub key: record::Key,
2980}
2981
2982/// The error result of [`Behaviour::put_record`].
2983#[derive(Debug, Clone, Error)]
2984pub enum PutRecordError {
2985 #[error("the quorum failed; needed {quorum} peers")]
2986 QuorumFailed {
2987 key: record::Key,
2988 /// [`PeerId`]s of the peers the record was successfully stored on.
2989 success: Vec<PeerId>,
2990 quorum: NonZeroUsize,
2991 },
2992 #[error("the request timed out")]
2993 Timeout {
2994 key: record::Key,
2995 /// [`PeerId`]s of the peers the record was successfully stored on.
2996 success: Vec<PeerId>,
2997 quorum: NonZeroUsize,
2998 },
2999}
3000
3001impl PutRecordError {
3002 /// Gets the key of the record for which the operation failed.
3003 pub fn key(&self) -> &record::Key {
3004 match self {
3005 PutRecordError::QuorumFailed { key, .. } => key,
3006 PutRecordError::Timeout { key, .. } => key,
3007 }
3008 }
3009
3010 /// Extracts the key of the record for which the operation failed,
3011 /// consuming the error.
3012 pub fn into_key(self) -> record::Key {
3013 match self {
3014 PutRecordError::QuorumFailed { key, .. } => key,
3015 PutRecordError::Timeout { key, .. } => key,
3016 }
3017 }
3018}
3019
3020/// The result of [`Behaviour::bootstrap`].
3021pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
3022
3023/// The successful result of [`Behaviour::bootstrap`].
3024#[derive(Debug, Clone)]
3025pub struct BootstrapOk {
3026 pub peer: PeerId,
3027 pub num_remaining: u32,
3028}
3029
3030/// The error result of [`Behaviour::bootstrap`].
3031#[derive(Debug, Clone, Error)]
3032pub enum BootstrapError {
3033 #[error("the request timed out")]
3034 Timeout {
3035 peer: PeerId,
3036 num_remaining: Option<u32>,
3037 },
3038}
3039
3040/// The result of [`Behaviour::get_closest_peers`].
3041pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
3042
3043/// The successful result of [`Behaviour::get_closest_peers`].
3044#[derive(Debug, Clone)]
3045pub struct GetClosestPeersOk {
3046 pub key: Vec<u8>,
3047 pub peers: Vec<PeerInfo>,
3048}
3049
3050/// The error result of [`Behaviour::get_closest_peers`].
3051#[derive(Debug, Clone, Error)]
3052pub enum GetClosestPeersError {
3053 #[error("the request timed out")]
3054 Timeout { key: Vec<u8>, peers: Vec<PeerInfo> },
3055}
3056
3057impl GetClosestPeersError {
3058 /// Gets the key for which the operation failed.
3059 pub fn key(&self) -> &Vec<u8> {
3060 match self {
3061 GetClosestPeersError::Timeout { key, .. } => key,
3062 }
3063 }
3064
3065 /// Extracts the key for which the operation failed,
3066 /// consuming the error.
3067 pub fn into_key(self) -> Vec<u8> {
3068 match self {
3069 GetClosestPeersError::Timeout { key, .. } => key,
3070 }
3071 }
3072}
3073
3074/// The result of [`Behaviour::get_providers`].
3075pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
3076
3077/// The successful result of [`Behaviour::get_providers`].
3078#[derive(Debug, Clone)]
3079pub enum GetProvidersOk {
3080 FoundProviders {
3081 key: record::Key,
3082 /// The new set of providers discovered.
3083 providers: HashSet<PeerId>,
3084 },
3085 FinishedWithNoAdditionalRecord {
3086 closest_peers: Vec<PeerId>,
3087 },
3088}
3089
3090/// The error result of [`Behaviour::get_providers`].
3091#[derive(Debug, Clone, Error)]
3092pub enum GetProvidersError {
3093 #[error("the request timed out")]
3094 Timeout {
3095 key: record::Key,
3096 closest_peers: Vec<PeerId>,
3097 },
3098}
3099
3100impl GetProvidersError {
3101 /// Gets the key for which the operation failed.
3102 pub fn key(&self) -> &record::Key {
3103 match self {
3104 GetProvidersError::Timeout { key, .. } => key,
3105 }
3106 }
3107
3108 /// Extracts the key for which the operation failed,
3109 /// consuming the error.
3110 pub fn into_key(self) -> record::Key {
3111 match self {
3112 GetProvidersError::Timeout { key, .. } => key,
3113 }
3114 }
3115}
3116
3117/// The result of publishing a provider record.
3118pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
3119
3120/// The successful result of publishing a provider record.
3121#[derive(Debug, Clone)]
3122pub struct AddProviderOk {
3123 pub key: record::Key,
3124}
3125
3126/// The possible errors when publishing a provider record.
3127#[derive(Debug, Clone, Error)]
3128pub enum AddProviderError {
3129 #[error("the request timed out")]
3130 Timeout { key: record::Key },
3131}
3132
3133impl AddProviderError {
3134 /// Gets the key for which the operation failed.
3135 pub fn key(&self) -> &record::Key {
3136 match self {
3137 AddProviderError::Timeout { key, .. } => key,
3138 }
3139 }
3140
3141 /// Extracts the key for which the operation failed,
3142 pub fn into_key(self) -> record::Key {
3143 match self {
3144 AddProviderError::Timeout { key, .. } => key,
3145 }
3146 }
3147}
3148
3149impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
3150 fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
3151 KadPeer {
3152 node_id: e.node.key.into_preimage(),
3153 multiaddrs: e.node.value.into_vec(),
3154 connection_ty: match e.status {
3155 NodeStatus::Connected => ConnectionType::Connected,
3156 NodeStatus::Disconnected => ConnectionType::NotConnected,
3157 },
3158 }
3159 }
3160}
3161
3162/// The context of a [`QueryInfo::AddProvider`] query.
3163#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3164pub enum AddProviderContext {
3165 /// The context is a [`Behaviour::start_providing`] operation.
3166 Publish,
3167 /// The context is periodic republishing of provider announcements
3168 /// initiated earlier via [`Behaviour::start_providing`].
3169 Republish,
3170}
3171
3172/// The context of a [`QueryInfo::PutRecord`] query.
3173#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3174pub enum PutRecordContext {
3175 /// The context is a [`Behaviour::put_record`] operation.
3176 Publish,
3177 /// The context is periodic republishing of records stored
3178 /// earlier via [`Behaviour::put_record`].
3179 Republish,
3180 /// The context is periodic replication (i.e. without extending
3181 /// the record TTL) of stored records received earlier from another peer.
3182 Replicate,
3183 /// The context is a custom store operation targeting specific
3184 /// peers initiated by [`Behaviour::put_record_to`].
3185 Custom,
3186}
3187
3188/// Information about a running query.
3189#[derive(Debug, Clone)]
3190pub enum QueryInfo {
3191 /// A query initiated by [`Behaviour::bootstrap`].
3192 Bootstrap {
3193 /// The targeted peer ID.
3194 peer: PeerId,
3195 /// The remaining random peer IDs to query, one per
3196 /// bucket that still needs refreshing.
3197 ///
3198 /// This is `None` if the initial self-lookup has not
3199 /// yet completed and `Some` with an exhausted iterator
3200 /// if bootstrapping is complete.
3201 remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>,
3202 step: ProgressStep,
3203 },
3204
3205 /// A (repeated) query initiated by [`Behaviour::get_closest_peers`].
3206 GetClosestPeers {
3207 /// The key being queried (the preimage).
3208 key: Vec<u8>,
3209 /// Current index of events.
3210 step: ProgressStep,
3211 /// If required, `num_results` specifies expected responding peers
3212 num_results: Option<NonZeroUsize>,
3213 },
3214
3215 /// A (repeated) query initiated by [`Behaviour::get_providers`].
3216 GetProviders {
3217 /// The key for which to search for providers.
3218 key: record::Key,
3219 /// The number of providers found so far.
3220 providers_found: usize,
3221 /// Current index of events.
3222 step: ProgressStep,
3223 },
3224
3225 /// A (repeated) query initiated by [`Behaviour::start_providing`].
3226 AddProvider {
3227 /// The record key.
3228 key: record::Key,
3229 /// The current phase of the query.
3230 phase: AddProviderPhase,
3231 /// The execution context of the query.
3232 context: AddProviderContext,
3233 },
3234
3235 /// A (repeated) query initiated by [`Behaviour::put_record`].
3236 PutRecord {
3237 record: Record,
3238 /// The expected quorum of responses w.r.t. the replication factor.
3239 quorum: NonZeroUsize,
3240 /// The current phase of the query.
3241 phase: PutRecordPhase,
3242 /// The execution context of the query.
3243 context: PutRecordContext,
3244 },
3245
3246 /// A (repeated) query initiated by [`Behaviour::get_record`].
3247 GetRecord {
3248 /// The key to look for.
3249 key: record::Key,
3250 /// Current index of events.
3251 step: ProgressStep,
3252 /// Did we find at least one record?
3253 found_a_record: bool,
3254 /// The peers closest to the `key` that were queried but did not return a record,
3255 /// i.e. the peers that are candidates for caching the record.
3256 cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
3257 },
3258}
3259
3260impl QueryInfo {
3261 /// Creates an event for a handler to issue an outgoing request in the
3262 /// context of a query.
3263 fn to_request(&self, query_id: QueryId) -> HandlerIn {
3264 match &self {
3265 QueryInfo::Bootstrap { peer, .. } => HandlerIn::FindNodeReq {
3266 key: peer.to_bytes(),
3267 query_id,
3268 },
3269 QueryInfo::GetClosestPeers { key, .. } => HandlerIn::FindNodeReq {
3270 key: key.clone(),
3271 query_id,
3272 },
3273 QueryInfo::GetProviders { key, .. } => HandlerIn::GetProvidersReq {
3274 key: key.clone(),
3275 query_id,
3276 },
3277 QueryInfo::AddProvider { key, phase, .. } => match phase {
3278 AddProviderPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3279 key: key.to_vec(),
3280 query_id,
3281 },
3282 AddProviderPhase::AddProvider {
3283 provider_id,
3284 external_addresses,
3285 ..
3286 } => HandlerIn::AddProvider {
3287 key: key.clone(),
3288 provider: crate::protocol::KadPeer {
3289 node_id: *provider_id,
3290 multiaddrs: external_addresses.clone(),
3291 connection_ty: crate::protocol::ConnectionType::Connected,
3292 },
3293 query_id,
3294 },
3295 },
3296 QueryInfo::GetRecord { key, .. } => HandlerIn::GetRecord {
3297 key: key.clone(),
3298 query_id,
3299 },
3300 QueryInfo::PutRecord { record, phase, .. } => match phase {
3301 PutRecordPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3302 key: record.key.to_vec(),
3303 query_id,
3304 },
3305 PutRecordPhase::PutRecord { .. } => HandlerIn::PutRecord {
3306 record: record.clone(),
3307 query_id,
3308 },
3309 },
3310 }
3311 }
3312}
3313
3314/// The phases of a [`QueryInfo::AddProvider`] query.
3315#[derive(Debug, Clone)]
3316pub enum AddProviderPhase {
3317 /// The query is searching for the closest nodes to the record key.
3318 GetClosestPeers,
3319
3320 /// The query advertises the local node as a provider for the key to
3321 /// the closest nodes to the key.
3322 AddProvider {
3323 /// The local peer ID that is advertised as a provider.
3324 provider_id: PeerId,
3325 /// The external addresses of the provider being advertised.
3326 external_addresses: Vec<Multiaddr>,
3327 /// Query statistics from the finished `GetClosestPeers` phase.
3328 get_closest_peers_stats: QueryStats,
3329 },
3330}
3331
3332/// The phases of a [`QueryInfo::PutRecord`] query.
3333#[derive(Debug, Clone, PartialEq, Eq)]
3334pub enum PutRecordPhase {
3335 /// The query is searching for the closest nodes to the record key.
3336 GetClosestPeers,
3337
3338 /// The query is replicating the record to the closest nodes to the key.
3339 PutRecord {
3340 /// A list of peers the given record has been successfully replicated to.
3341 success: Vec<PeerId>,
3342 /// Query statistics from the finished `GetClosestPeers` phase.
3343 get_closest_peers_stats: QueryStats,
3344 },
3345}
3346
3347/// A mutable reference to a running query.
3348pub struct QueryMut<'a> {
3349 query: &'a mut Query,
3350}
3351
3352impl QueryMut<'_> {
3353 pub fn id(&self) -> QueryId {
3354 self.query.id()
3355 }
3356
3357 /// Gets information about the type and state of the query.
3358 pub fn info(&self) -> &QueryInfo {
3359 &self.query.info
3360 }
3361
3362 /// Gets execution statistics about the query.
3363 ///
3364 /// For a multi-phase query such as `put_record`, these are the
3365 /// statistics of the current phase.
3366 pub fn stats(&self) -> &QueryStats {
3367 self.query.stats()
3368 }
3369
3370 /// Finishes the query asap, without waiting for the
3371 /// regular termination conditions.
3372 pub fn finish(&mut self) {
3373 self.query.finish()
3374 }
3375}
3376
3377/// An immutable reference to a running query.
3378pub struct QueryRef<'a> {
3379 query: &'a Query,
3380}
3381
3382impl QueryRef<'_> {
3383 pub fn id(&self) -> QueryId {
3384 self.query.id()
3385 }
3386
3387 /// Gets information about the type and state of the query.
3388 pub fn info(&self) -> &QueryInfo {
3389 &self.query.info
3390 }
3391
3392 /// Gets execution statistics about the query.
3393 ///
3394 /// For a multi-phase query such as `put_record`, these are the
3395 /// statistics of the current phase.
3396 pub fn stats(&self) -> &QueryStats {
3397 self.query.stats()
3398 }
3399}
3400
3401/// An operation failed to due no known peers in the routing table.
3402#[derive(Debug, Clone)]
3403pub struct NoKnownPeers();
3404
3405impl fmt::Display for NoKnownPeers {
3406 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3407 write!(f, "No known peers.")
3408 }
3409}
3410
3411impl std::error::Error for NoKnownPeers {}
3412
3413/// The possible outcomes of [`Behaviour::add_address`].
3414#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3415pub enum RoutingUpdate {
3416 /// The given peer and address has been added to the routing
3417 /// table.
3418 Success,
3419 /// The peer and address is pending insertion into
3420 /// the routing table, if a disconnected peer fails
3421 /// to respond. If the given peer and address ends up
3422 /// in the routing table, [`Event::RoutingUpdated`]
3423 /// is eventually emitted.
3424 Pending,
3425 /// The routing table update failed, either because the
3426 /// corresponding bucket for the peer is full and the
3427 /// pending slot(s) are occupied, or because the given
3428 /// peer ID is deemed invalid (e.g. refers to the local
3429 /// peer ID).
3430 Failed,
3431}
3432
3433#[derive(PartialEq, Copy, Clone, Debug)]
3434pub enum Mode {
3435 Client,
3436 Server,
3437}
3438
3439impl fmt::Display for Mode {
3440 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3441 match self {
3442 Mode::Client => write!(f, "client"),
3443 Mode::Server => write!(f, "server"),
3444 }
3445 }
3446}
3447
3448fn to_comma_separated_list<T>(confirmed_external_addresses: &[T]) -> String
3449where
3450 T: ToString,
3451{
3452 confirmed_external_addresses
3453 .iter()
3454 .map(|addr| addr.to_string())
3455 .collect::<Vec<_>>()
3456 .join(", ")
3457}