libp2p_kad/
behaviour.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23mod test;
24
25use std::{
26    collections::{BTreeMap, HashMap, HashSet, VecDeque},
27    fmt,
28    num::NonZeroUsize,
29    task::{Context, Poll, Waker},
30    time::Duration,
31    vec,
32};
33
34use fnv::FnvHashSet;
35use libp2p_core::{transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
36use libp2p_identity::PeerId;
37use libp2p_swarm::{
38    behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
39    dial_opts::{self, DialOpts},
40    ConnectionDenied, ConnectionHandler, ConnectionId, DialError, ExternalAddresses,
41    ListenAddresses, NetworkBehaviour, NotifyHandler, StreamProtocol, THandler, THandlerInEvent,
42    THandlerOutEvent, ToSwarm,
43};
44use thiserror::Error;
45use tracing::Level;
46use web_time::Instant;
47
48pub use crate::query::QueryStats;
49use crate::{
50    addresses::Addresses,
51    bootstrap,
52    handler::{Handler, HandlerEvent, HandlerIn, RequestId},
53    jobs::*,
54    kbucket::{self, Distance, KBucketConfig, KBucketsTable, NodeStatus},
55    protocol,
56    protocol::{ConnectionType, KadPeer, ProtocolConfig},
57    query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState},
58    record::{
59        self,
60        store::{self, RecordStore},
61        ProviderRecord, Record,
62    },
63    K_VALUE,
64};
65
66/// `Behaviour` is a `NetworkBehaviour` that implements the libp2p
67/// Kademlia protocol.
68pub struct Behaviour<TStore> {
69    /// The Kademlia routing table.
70    kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
71
72    /// The k-bucket insertion strategy.
73    kbucket_inserts: BucketInserts,
74
75    /// Configuration of the wire protocol.
76    protocol_config: ProtocolConfig,
77
78    /// Configuration of [`RecordStore`] filtering.
79    record_filtering: StoreInserts,
80
81    /// The currently active (i.e. in-progress) queries.
82    queries: QueryPool,
83
84    /// The currently connected peers.
85    ///
86    /// This is a superset of the connected peers currently in the routing table.
87    connected_peers: FnvHashSet<PeerId>,
88
89    /// Periodic job for re-publication of provider records for keys
90    /// provided by the local node.
91    add_provider_job: Option<AddProviderJob>,
92
93    /// Periodic job for (re-)replication and (re-)publishing of
94    /// regular (value-)records.
95    put_record_job: Option<PutRecordJob>,
96
97    /// The TTL of regular (value-)records.
98    record_ttl: Option<Duration>,
99
100    /// The TTL of provider records.
101    provider_record_ttl: Option<Duration>,
102
103    /// Queued events to return when the behaviour is being polled.
104    queued_events: VecDeque<ToSwarm<Event, HandlerIn>>,
105
106    listen_addresses: ListenAddresses,
107
108    external_addresses: ExternalAddresses,
109
110    connections: HashMap<ConnectionId, PeerId>,
111
112    /// See [`Config::caching`].
113    caching: Caching,
114
115    local_peer_id: PeerId,
116
117    mode: Mode,
118    auto_mode: bool,
119    no_events_waker: Option<Waker>,
120
121    /// The record storage.
122    store: TStore,
123
124    /// Tracks the status of the current bootstrap.
125    bootstrap_status: bootstrap::Status,
126}
127
128/// The configurable strategies for the insertion of peers
129/// and their addresses into the k-buckets of the Kademlia
130/// routing table.
131#[derive(Copy, Clone, Debug, PartialEq, Eq)]
132pub enum BucketInserts {
133    /// Whenever a connection to a peer is established as a
134    /// result of a dialing attempt and that peer is not yet
135    /// in the routing table, it is inserted as long as there
136    /// is a free slot in the corresponding k-bucket. If the
137    /// k-bucket is full but still has a free pending slot,
138    /// it may be inserted into the routing table at a later time if an unresponsive
139    /// disconnected peer is evicted from the bucket.
140    OnConnected,
141    /// New peers and addresses are only added to the routing table via
142    /// explicit calls to [`Behaviour::add_address`].
143    ///
144    /// > **Note**: Even though peers can only get into the
145    /// > routing table as a result of [`Behaviour::add_address`],
146    /// > routing table entries are still updated as peers
147    /// > connect and disconnect (i.e. the order of the entries
148    /// > as well as the network addresses).
149    Manual,
150}
151
152/// The configurable filtering strategies for the acceptance of
153/// incoming records.
154///
155/// This can be used for e.g. signature verification or validating
156/// the accompanying [`Key`].
157///
158/// [`Key`]: crate::record::Key
159#[derive(Copy, Clone, Debug, PartialEq, Eq)]
160pub enum StoreInserts {
161    /// Whenever a (provider) record is received,
162    /// the record is forwarded immediately to the [`RecordStore`].
163    Unfiltered,
164    /// Whenever a (provider) record is received, an event is emitted.
165    /// Provider records generate a [`InboundRequest::AddProvider`] under
166    /// [`Event::InboundRequest`], normal records generate a [`InboundRequest::PutRecord`]
167    /// under [`Event::InboundRequest`].
168    ///
169    /// When deemed valid, a (provider) record needs to be explicitly stored in
170    /// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`],
171    /// whichever is applicable. A mutable reference to the [`RecordStore`] can
172    /// be retrieved via [`Behaviour::store_mut`].
173    FilterBoth,
174}
175
176/// The configuration for the `Kademlia` behaviour.
177///
178/// The configuration is consumed by [`Behaviour::new`].
179#[derive(Debug, Clone)]
180pub struct Config {
181    kbucket_config: KBucketConfig,
182    query_config: QueryConfig,
183    protocol_config: ProtocolConfig,
184    record_ttl: Option<Duration>,
185    record_replication_interval: Option<Duration>,
186    record_publication_interval: Option<Duration>,
187    record_filtering: StoreInserts,
188    provider_record_ttl: Option<Duration>,
189    provider_publication_interval: Option<Duration>,
190    kbucket_inserts: BucketInserts,
191    caching: Caching,
192    periodic_bootstrap_interval: Option<Duration>,
193    automatic_bootstrap_throttle: Option<Duration>,
194}
195
196impl Default for Config {
197    /// Returns the default configuration.
198    ///
199    /// Deprecated: use `Config::new` instead.
200    fn default() -> Self {
201        Self::new(protocol::DEFAULT_PROTO_NAME)
202    }
203}
204
205/// The configuration for Kademlia "write-back" caching after successful
206/// lookups via [`Behaviour::get_record`].
207#[derive(Debug, Clone)]
208pub enum Caching {
209    /// Caching is disabled and the peers closest to records being looked up
210    /// that do not return a record are not tracked, i.e.
211    /// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty.
212    Disabled,
213    /// Up to `max_peers` peers not returning a record that are closest to the key
214    /// being looked up are tracked and returned in
215    /// [`GetRecordOk::FinishedWithNoAdditionalRecord`]. The write-back operation must be
216    /// performed explicitly, if desired and after choosing a record from the results, via
217    /// [`Behaviour::put_record_to`].
218    Enabled { max_peers: u16 },
219}
220
221impl Config {
222    /// Builds a new `Config` with the given protocol name.
223    pub fn new(protocol_name: StreamProtocol) -> Self {
224        Config {
225            kbucket_config: KBucketConfig::default(),
226            query_config: QueryConfig::default(),
227            protocol_config: ProtocolConfig::new(protocol_name),
228            record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
229            record_replication_interval: Some(Duration::from_secs(60 * 60)),
230            record_publication_interval: Some(Duration::from_secs(22 * 60 * 60)),
231            record_filtering: StoreInserts::Unfiltered,
232            provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
233            provider_record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
234            kbucket_inserts: BucketInserts::OnConnected,
235            caching: Caching::Enabled { max_peers: 1 },
236            periodic_bootstrap_interval: Some(Duration::from_secs(5 * 60)),
237            automatic_bootstrap_throttle: Some(bootstrap::DEFAULT_AUTOMATIC_THROTTLE),
238        }
239    }
240
241    /// Sets the timeout for a single query.
242    ///
243    /// > **Note**: A single query usually comprises at least as many requests
244    /// > as the replication factor, i.e. this is not a request timeout.
245    ///
246    /// The default is 60 seconds.
247    pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
248        self.query_config.timeout = timeout;
249        self
250    }
251
252    /// Sets the replication factor to use.
253    ///
254    /// The replication factor determines to how many closest peers
255    /// a record is replicated. The default is [`crate::K_VALUE`].
256    pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
257        self.query_config.replication_factor = replication_factor;
258        self
259    }
260
261    /// Sets the allowed level of parallelism for iterative queries.
262    ///
263    /// The `α` parameter in the Kademlia paper. The maximum number of peers
264    /// that an iterative query is allowed to wait for in parallel while
265    /// iterating towards the closest nodes to a target. Defaults to
266    /// `ALPHA_VALUE`.
267    ///
268    /// This only controls the level of parallelism of an iterative query, not
269    /// the level of parallelism of a query to a fixed set of peers.
270    ///
271    /// When used with [`Config::disjoint_query_paths`] it equals
272    /// the amount of disjoint paths used.
273    pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
274        self.query_config.parallelism = parallelism;
275        self
276    }
277
278    /// Require iterative queries to use disjoint paths for increased resiliency
279    /// in the presence of potentially adversarial nodes.
280    ///
281    /// When enabled the number of disjoint paths used equals the configured
282    /// parallelism.
283    ///
284    /// See the S/Kademlia paper for more information on the high level design
285    /// as well as its security improvements.
286    pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
287        self.query_config.disjoint_query_paths = enabled;
288        self
289    }
290
291    /// Sets the TTL for stored records.
292    ///
293    /// The TTL should be significantly longer than the (re-)publication
294    /// interval, to avoid premature expiration of records. The default is 36
295    /// hours.
296    ///
297    /// `None` means records never expire.
298    ///
299    /// Does not apply to provider records.
300    pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
301        self.record_ttl = record_ttl;
302        self
303    }
304
305    /// Sets whether or not records should be filtered before being stored.
306    ///
307    /// See [`StoreInserts`] for the different values.
308    /// Defaults to [`StoreInserts::Unfiltered`].
309    pub fn set_record_filtering(&mut self, filtering: StoreInserts) -> &mut Self {
310        self.record_filtering = filtering;
311        self
312    }
313
314    /// Sets the (re-)replication interval for stored records.
315    ///
316    /// Periodic replication of stored records ensures that the records
317    /// are always replicated to the available nodes closest to the key in the
318    /// context of DHT topology changes (i.e. nodes joining and leaving), thus
319    /// ensuring persistence until the record expires. Replication does not
320    /// prolong the regular lifetime of a record (for otherwise it would live
321    /// forever regardless of the configured TTL). The expiry of a record
322    /// is only extended through re-publication.
323    ///
324    /// This interval should be significantly shorter than the publication
325    /// interval, to ensure persistence between re-publications. The default
326    /// is 1 hour.
327    ///
328    /// `None` means that stored records are never re-replicated.
329    ///
330    /// Does not apply to provider records.
331    pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
332        self.record_replication_interval = interval;
333        self
334    }
335
336    /// Sets the (re-)publication interval of stored records.
337    ///
338    /// Records persist in the DHT until they expire. By default, published
339    /// records are re-published in regular intervals for as long as the record
340    /// exists in the local storage of the original publisher, thereby extending
341    /// the records lifetime.
342    ///
343    /// This interval should be significantly shorter than the record TTL, to
344    /// ensure records do not expire prematurely. The default is 24 hours.
345    ///
346    /// `None` means that stored records are never automatically re-published.
347    ///
348    /// Does not apply to provider records.
349    pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
350        self.record_publication_interval = interval;
351        self
352    }
353
354    /// Sets the TTL for provider records.
355    ///
356    /// `None` means that stored provider records never expire.
357    ///
358    /// Must be significantly larger than the provider publication interval.
359    pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
360        self.provider_record_ttl = ttl;
361        self
362    }
363
364    /// Sets the interval at which provider records for keys provided
365    /// by the local node are re-published.
366    ///
367    /// `None` means that stored provider records are never automatically
368    /// re-published.
369    ///
370    /// Must be significantly less than the provider record TTL.
371    pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
372        self.provider_publication_interval = interval;
373        self
374    }
375
376    /// Modifies the maximum allowed size of individual Kademlia packets.
377    ///
378    /// It might be necessary to increase this value if trying to put large
379    /// records.
380    pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
381        self.protocol_config.set_max_packet_size(size);
382        self
383    }
384
385    /// Sets the k-bucket insertion strategy for the Kademlia routing table.
386    pub fn set_kbucket_inserts(&mut self, inserts: BucketInserts) -> &mut Self {
387        self.kbucket_inserts = inserts;
388        self
389    }
390
391    /// Sets the [`Caching`] strategy to use for successful lookups.
392    ///
393    /// The default is [`Caching::Enabled`] with a `max_peers` of 1.
394    /// Hence, with default settings and a lookup quorum of 1, a successful lookup
395    /// will result in the record being cached at the closest node to the key that
396    /// did not return the record, i.e. the standard Kademlia behaviour.
397    pub fn set_caching(&mut self, c: Caching) -> &mut Self {
398        self.caching = c;
399        self
400    }
401
402    /// Sets the interval on which [`Behaviour::bootstrap`] is called periodically.
403    ///
404    /// * Default to `5` minutes.
405    /// * Set to `None` to disable periodic bootstrap.
406    pub fn set_periodic_bootstrap_interval(&mut self, interval: Option<Duration>) -> &mut Self {
407        self.periodic_bootstrap_interval = interval;
408        self
409    }
410
411    /// Sets the configuration for the k-buckets.
412    ///
413    /// * Default to K_VALUE.
414    ///
415    /// **WARNING**: setting a `size` higher that `K_VALUE` may imply additional memory allocations.
416    pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self {
417        self.kbucket_config.set_bucket_size(size);
418        self
419    }
420
421    /// Sets the timeout duration after creation of a pending entry after which
422    /// it becomes eligible for insertion into a full bucket, replacing the
423    /// least-recently (dis)connected node.
424    ///
425    /// * Default to `60` s.
426    pub fn set_kbucket_pending_timeout(&mut self, timeout: Duration) -> &mut Self {
427        self.kbucket_config.set_pending_timeout(timeout);
428        self
429    }
430
431    /// Sets the time to wait before calling [`Behaviour::bootstrap`] after a new peer is inserted
432    /// in the routing table. This prevent cascading bootstrap requests when multiple peers are
433    /// inserted into the routing table "at the same time". This also allows to wait a little
434    /// bit for other potential peers to be inserted into the routing table before triggering a
435    /// bootstrap, giving more context to the future bootstrap request.
436    ///
437    /// * Default to `500` ms.
438    /// * Set to `Some(Duration::ZERO)` to never wait before triggering a bootstrap request when a
439    ///   new peer is inserted in the routing table.
440    /// * Set to `None` to disable automatic bootstrap (no bootstrap request will be triggered when
441    ///   a new peer is inserted in the routing table).
442    #[cfg(test)]
443    pub(crate) fn set_automatic_bootstrap_throttle(
444        &mut self,
445        duration: Option<Duration>,
446    ) -> &mut Self {
447        self.automatic_bootstrap_throttle = duration;
448        self
449    }
450}
451
452impl<TStore> Behaviour<TStore>
453where
454    TStore: RecordStore + Send + 'static,
455{
456    /// Creates a new `Kademlia` network behaviour with a default configuration.
457    pub fn new(id: PeerId, store: TStore) -> Self {
458        Self::with_config(id, store, Default::default())
459    }
460
461    /// Get the protocol name of this kademlia instance.
462    pub fn protocol_names(&self) -> &[StreamProtocol] {
463        self.protocol_config.protocol_names()
464    }
465
466    /// Creates a new `Kademlia` network behaviour with the given configuration.
467    pub fn with_config(id: PeerId, store: TStore, config: Config) -> Self {
468        let local_key = kbucket::Key::from(id);
469
470        let put_record_job = config
471            .record_replication_interval
472            .or(config.record_publication_interval)
473            .map(|interval| {
474                PutRecordJob::new(
475                    id,
476                    interval,
477                    config.record_publication_interval,
478                    config.record_ttl,
479                )
480            });
481
482        let add_provider_job = config
483            .provider_publication_interval
484            .map(AddProviderJob::new);
485
486        Behaviour {
487            store,
488            caching: config.caching,
489            kbuckets: KBucketsTable::new(local_key, config.kbucket_config),
490            kbucket_inserts: config.kbucket_inserts,
491            protocol_config: config.protocol_config,
492            record_filtering: config.record_filtering,
493            queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
494            listen_addresses: Default::default(),
495            queries: QueryPool::new(config.query_config),
496            connected_peers: Default::default(),
497            add_provider_job,
498            put_record_job,
499            record_ttl: config.record_ttl,
500            provider_record_ttl: config.provider_record_ttl,
501            external_addresses: Default::default(),
502            local_peer_id: id,
503            connections: Default::default(),
504            mode: Mode::Client,
505            auto_mode: true,
506            no_events_waker: None,
507            bootstrap_status: bootstrap::Status::new(
508                config.periodic_bootstrap_interval,
509                config.automatic_bootstrap_throttle,
510            ),
511        }
512    }
513
514    /// Gets an iterator over immutable references to all running queries.
515    pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
516        self.queries.iter().filter_map(|query| {
517            if !query.is_finished() {
518                Some(QueryRef { query })
519            } else {
520                None
521            }
522        })
523    }
524
525    /// Gets an iterator over mutable references to all running queries.
526    pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
527        self.queries.iter_mut().filter_map(|query| {
528            if !query.is_finished() {
529                Some(QueryMut { query })
530            } else {
531                None
532            }
533        })
534    }
535
536    /// Gets an immutable reference to a running query, if it exists.
537    pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
538        self.queries.get(id).and_then(|query| {
539            if !query.is_finished() {
540                Some(QueryRef { query })
541            } else {
542                None
543            }
544        })
545    }
546
547    /// Gets a mutable reference to a running query, if it exists.
548    pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
549        self.queries.get_mut(id).and_then(|query| {
550            if !query.is_finished() {
551                Some(QueryMut { query })
552            } else {
553                None
554            }
555        })
556    }
557
558    /// Adds a known listen address of a peer participating in the DHT to the
559    /// routing table.
560    ///
561    /// Explicitly adding addresses of peers serves two purposes:
562    ///
563    ///   1. In order for a node to join the DHT, it must know about at least one other node of the
564    ///      DHT.
565    ///
566    ///   2. When a remote peer initiates a connection and that peer is not yet in the routing
567    ///      table, the `Kademlia` behaviour must be informed of an address on which that peer is
568    ///      listening for connections before it can be added to the routing table from where it can
569    ///      subsequently be discovered by all peers in the DHT.
570    ///
571    /// If the routing table has been updated as a result of this operation,
572    /// a [`Event::RoutingUpdated`] event is emitted.
573    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
574        // ensuring address is a fully-qualified /p2p multiaddr
575        let Ok(address) = address.with_p2p(*peer) else {
576            return RoutingUpdate::Failed;
577        };
578        let key = kbucket::Key::from(*peer);
579        match self.kbuckets.entry(&key) {
580            Some(kbucket::Entry::Present(mut entry, _)) => {
581                if entry.value().insert(address) {
582                    self.queued_events
583                        .push_back(ToSwarm::GenerateEvent(Event::RoutingUpdated {
584                            peer: *peer,
585                            is_new_peer: false,
586                            addresses: entry.value().clone(),
587                            old_peer: None,
588                            bucket_range: self
589                                .kbuckets
590                                .bucket(&key)
591                                .map(|b| b.range())
592                                .expect("Not kbucket::Entry::SelfEntry."),
593                        }))
594                }
595                RoutingUpdate::Success
596            }
597            Some(kbucket::Entry::Pending(mut entry, _)) => {
598                entry.value().insert(address);
599                RoutingUpdate::Pending
600            }
601            Some(kbucket::Entry::Absent(entry)) => {
602                let addresses = Addresses::new(address);
603                let status = if self.connected_peers.contains(peer) {
604                    NodeStatus::Connected
605                } else {
606                    NodeStatus::Disconnected
607                };
608                match entry.insert(addresses.clone(), status) {
609                    kbucket::InsertResult::Inserted => {
610                        self.bootstrap_on_low_peers();
611
612                        self.queued_events.push_back(ToSwarm::GenerateEvent(
613                            Event::RoutingUpdated {
614                                peer: *peer,
615                                is_new_peer: true,
616                                addresses,
617                                old_peer: None,
618                                bucket_range: self
619                                    .kbuckets
620                                    .bucket(&key)
621                                    .map(|b| b.range())
622                                    .expect("Not kbucket::Entry::SelfEntry."),
623                            },
624                        ));
625                        RoutingUpdate::Success
626                    }
627                    kbucket::InsertResult::Full => {
628                        tracing::debug!(%peer, "Bucket full. Peer not added to routing table");
629                        RoutingUpdate::Failed
630                    }
631                    kbucket::InsertResult::Pending { disconnected } => {
632                        self.queued_events.push_back(ToSwarm::Dial {
633                            opts: DialOpts::peer_id(disconnected.into_preimage()).build(),
634                        });
635                        RoutingUpdate::Pending
636                    }
637                }
638            }
639            None => RoutingUpdate::Failed,
640        }
641    }
642
643    /// Removes an address of a peer from the routing table.
644    ///
645    /// If the given address is the last address of the peer in the
646    /// routing table, the peer is removed from the routing table
647    /// and `Some` is returned with a view of the removed entry.
648    /// The same applies if the peer is currently pending insertion
649    /// into the routing table.
650    ///
651    /// If the given peer or address is not in the routing table,
652    /// this is a no-op.
653    pub fn remove_address(
654        &mut self,
655        peer: &PeerId,
656        address: &Multiaddr,
657    ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
658        let address = &address.to_owned().with_p2p(*peer).ok()?;
659        let key = kbucket::Key::from(*peer);
660        match self.kbuckets.entry(&key)? {
661            kbucket::Entry::Present(mut entry, _) => {
662                if entry.value().remove(address).is_err() {
663                    Some(entry.remove()) // it is the last address, thus remove the peer.
664                } else {
665                    None
666                }
667            }
668            kbucket::Entry::Pending(mut entry, _) => {
669                if entry.value().remove(address).is_err() {
670                    Some(entry.remove()) // it is the last address, thus remove the peer.
671                } else {
672                    None
673                }
674            }
675            kbucket::Entry::Absent(..) => None,
676        }
677    }
678
679    /// Removes a peer from the routing table.
680    ///
681    /// Returns `None` if the peer was not in the routing table,
682    /// not even pending insertion.
683    pub fn remove_peer(
684        &mut self,
685        peer: &PeerId,
686    ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
687        let key = kbucket::Key::from(*peer);
688        match self.kbuckets.entry(&key)? {
689            kbucket::Entry::Present(entry, _) => Some(entry.remove()),
690            kbucket::Entry::Pending(entry, _) => Some(entry.remove()),
691            kbucket::Entry::Absent(..) => None,
692        }
693    }
694
695    /// Returns an iterator over all non-empty buckets in the routing table.
696    pub fn kbuckets(
697        &mut self,
698    ) -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>> {
699        self.kbuckets.iter().filter(|b| !b.is_empty())
700    }
701
702    /// Returns the k-bucket for the distance to the given key.
703    ///
704    /// Returns `None` if the given key refers to the local key.
705    pub fn kbucket<K>(
706        &mut self,
707        key: K,
708    ) -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
709    where
710        K: Into<kbucket::Key<K>> + Clone,
711    {
712        self.kbuckets.bucket(&key.into())
713    }
714
715    /// Initiates an iterative query for the closest peers to the given key.
716    ///
717    /// The result of the query is delivered in a
718    /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
719    pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
720    where
721        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
722    {
723        self.get_closest_peers_inner(key, None)
724    }
725
726    /// Initiates an iterative query for the closest peers to the given key.
727    /// The expected responding peers is specified by `num_results`
728    /// Note that the result is capped after exceeds K_VALUE
729    ///
730    /// The result of the query is delivered in a
731    /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
732    pub fn get_n_closest_peers<K>(&mut self, key: K, num_results: NonZeroUsize) -> QueryId
733    where
734        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
735    {
736        // The inner code never expect higher than K_VALUE results to be returned.
737        // And removing such cap will be tricky,
738        // since it would involve forging a new key and additional requests.
739        // Hence bound to K_VALUE here to set clear expectation and prevent unexpected behaviour.
740        let capped_num_results = std::cmp::min(num_results, K_VALUE);
741        self.get_closest_peers_inner(key, Some(capped_num_results))
742    }
743
744    fn get_closest_peers_inner<K>(&mut self, key: K, num_results: Option<NonZeroUsize>) -> QueryId
745    where
746        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
747    {
748        let target: kbucket::Key<K> = key.clone().into();
749        let key: Vec<u8> = key.into();
750        let info = QueryInfo::GetClosestPeers {
751            key,
752            step: ProgressStep::first(),
753            num_results,
754        };
755        let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
756        self.queries.add_iter_closest(target, peer_keys, info)
757    }
758
759    /// Returns all peers ordered by distance to the given key; takes peers from local routing table
760    /// only.
761    pub fn get_closest_local_peers<'a, K: Clone>(
762        &'a mut self,
763        key: &'a kbucket::Key<K>,
764    ) -> impl Iterator<Item = kbucket::Key<PeerId>> + 'a {
765        self.kbuckets.closest_keys(key)
766    }
767
768    /// Finds the closest peers to a `key` in the context of a request by the `source` peer, such
769    /// that the `source` peer is never included in the result.
770    ///
771    /// Takes peers from local routing table only. Only returns number of peers equal to configured
772    /// replication factor.
773    pub fn find_closest_local_peers<'a, K: Clone>(
774        &'a mut self,
775        key: &'a kbucket::Key<K>,
776        source: &'a PeerId,
777    ) -> impl Iterator<Item = KadPeer> + 'a {
778        self.kbuckets
779            .closest(key)
780            .filter(move |e| e.node.key.preimage() != source)
781            .take(self.queries.config().replication_factor.get())
782            .map(KadPeer::from)
783    }
784
785    /// Performs a lookup for a record in the DHT.
786    ///
787    /// The result of this operation is delivered in a
788    /// [`Event::OutboundQueryProgressed{QueryResult::GetRecord}`].
789    pub fn get_record(&mut self, key: record::Key) -> QueryId {
790        let record = if let Some(record) = self.store.get(&key) {
791            if record.is_expired(Instant::now()) {
792                self.store.remove(&key);
793                None
794            } else {
795                Some(PeerRecord {
796                    peer: None,
797                    record: record.into_owned(),
798                })
799            }
800        } else {
801            None
802        };
803
804        let step = ProgressStep::first();
805
806        let target = kbucket::Key::new(key.clone());
807        let info = if record.is_some() {
808            QueryInfo::GetRecord {
809                key,
810                step: step.next(),
811                found_a_record: true,
812                cache_candidates: BTreeMap::new(),
813            }
814        } else {
815            QueryInfo::GetRecord {
816                key,
817                step: step.clone(),
818                found_a_record: false,
819                cache_candidates: BTreeMap::new(),
820            }
821        };
822        let peers = self.kbuckets.closest_keys(&target);
823        let id = self.queries.add_iter_closest(target.clone(), peers, info);
824
825        // No queries were actually done for the results yet.
826        let stats = QueryStats::empty();
827
828        if let Some(record) = record {
829            self.queued_events
830                .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
831                    id,
832                    result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))),
833                    step,
834                    stats,
835                }));
836        }
837
838        id
839    }
840
841    /// Stores a record in the DHT, locally as well as at the nodes
842    /// closest to the key as per the xor distance metric.
843    ///
844    /// Returns `Ok` if a record has been stored locally, providing the
845    /// `QueryId` of the initial query that replicates the record in the DHT.
846    /// The result of the query is eventually reported as a
847    /// [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`].
848    ///
849    /// The record is always stored locally with the given expiration. If the record's
850    /// expiration is `None`, the common case, it does not expire in local storage
851    /// but is still replicated with the configured record TTL. To remove the record
852    /// locally and stop it from being re-published in the DHT, see [`Behaviour::remove_record`].
853    ///
854    /// After the initial publication of the record, it is subject to (re-)replication
855    /// and (re-)publication as per the configured intervals. Periodic (re-)publication
856    /// does not update the record's expiration in local storage, thus a given record
857    /// with an explicit expiration will always expire at that instant and until then
858    /// is subject to regular (re-)replication and (re-)publication.
859    pub fn put_record(
860        &mut self,
861        mut record: Record,
862        quorum: Quorum,
863    ) -> Result<QueryId, store::Error> {
864        record.publisher = Some(*self.kbuckets.local_key().preimage());
865        self.store.put(record.clone())?;
866        record.expires = record
867            .expires
868            .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
869        let quorum = quorum.eval(self.queries.config().replication_factor);
870        let target = kbucket::Key::new(record.key.clone());
871        let peers = self.kbuckets.closest_keys(&target);
872        let context = PutRecordContext::Publish;
873        let info = QueryInfo::PutRecord {
874            context,
875            record,
876            quorum,
877            phase: PutRecordPhase::GetClosestPeers,
878        };
879        Ok(self.queries.add_iter_closest(target.clone(), peers, info))
880    }
881
882    /// Stores a record at specific peers, without storing it locally.
883    ///
884    /// The given [`Quorum`] is understood in the context of the total
885    /// number of distinct peers given.
886    ///
887    /// If the record's expiration is `None`, the configured record TTL is used.
888    ///
889    /// > **Note**: This is not a regular Kademlia DHT operation. It needs to be
890    /// > used to selectively update or store a record to specific peers
891    /// > for the purpose of e.g. making sure these peers have the latest
892    /// > "version" of a record or to "cache" a record at further peers
893    /// > to increase the lookup success rate on the DHT for other peers.
894    /// >
895    /// > In particular, there is no automatic storing of records performed, and this
896    /// > method must be used to ensure the standard Kademlia
897    /// > procedure of "caching" (i.e. storing) a found record at the closest
898    /// > node to the key that _did not_ return it.
899    pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
900    where
901        I: ExactSizeIterator<Item = PeerId>,
902    {
903        let quorum = if peers.len() > 0 {
904            quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0"))
905        } else {
906            // If no peers are given, we just let the query fail immediately
907            // due to the fact that the quorum must be at least one, instead of
908            // introducing a new kind of error.
909            NonZeroUsize::new(1).expect("1 > 0")
910        };
911        record.expires = record
912            .expires
913            .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
914        let context = PutRecordContext::Custom;
915        let info = QueryInfo::PutRecord {
916            context,
917            record,
918            quorum,
919            phase: PutRecordPhase::PutRecord {
920                success: Vec::new(),
921                get_closest_peers_stats: QueryStats::empty(),
922            },
923        };
924        self.queries.add_fixed(peers, info)
925    }
926
927    /// Removes the record with the given key from _local_ storage,
928    /// if the local node is the publisher of the record.
929    ///
930    /// Has no effect if a record for the given key is stored locally but
931    /// the local node is not a publisher of the record.
932    ///
933    /// This is a _local_ operation. However, it also has the effect that
934    /// the record will no longer be periodically re-published, allowing the
935    /// record to eventually expire throughout the DHT.
936    pub fn remove_record(&mut self, key: &record::Key) {
937        if let Some(r) = self.store.get(key) {
938            if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
939                self.store.remove(key)
940            }
941        }
942    }
943
944    /// Gets a mutable reference to the record store.
945    pub fn store_mut(&mut self) -> &mut TStore {
946        &mut self.store
947    }
948
949    /// Bootstraps the local node to join the DHT.
950    ///
951    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
952    /// own ID in the DHT. This introduces the local node to the other nodes
953    /// in the DHT and populates its routing table with the closest neighbours.
954    ///
955    /// Subsequently, all buckets farther from the bucket of the closest neighbour are
956    /// refreshed by initiating an additional bootstrapping query for each such
957    /// bucket with random keys.
958    ///
959    /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
960    /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
961    /// reported via [`Event::OutboundQueryProgressed{QueryResult::Bootstrap}`] events,
962    /// with one such event per bootstrapping query.
963    ///
964    /// Returns `Err` if bootstrapping is impossible due an empty routing table.
965    ///
966    /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
967    /// > See [`Behaviour::add_address`].
968    ///
969    /// > **Note**: Bootstrap does not require to be called manually. It is periodically
970    /// > invoked at regular intervals based on the configured `periodic_bootstrap_interval` (see
971    /// > [`Config::set_periodic_bootstrap_interval`] for details) and it is also automatically
972    /// > invoked
973    /// > when a new peer is inserted in the routing table.
974    /// > This parameter is used to call [`Behaviour::bootstrap`] periodically and automatically
975    /// > to ensure a healthy routing table.
976    pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
977        let local_key = *self.kbuckets.local_key();
978        let info = QueryInfo::Bootstrap {
979            peer: *local_key.preimage(),
980            remaining: None,
981            step: ProgressStep::first(),
982        };
983        let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
984        if peers.is_empty() {
985            self.bootstrap_status.reset_timers();
986            Err(NoKnownPeers())
987        } else {
988            self.bootstrap_status.on_started();
989            Ok(self.queries.add_iter_closest(local_key, peers, info))
990        }
991    }
992
993    /// Establishes the local node as a provider of a value for the given key.
994    ///
995    /// This operation publishes a provider record with the given key and
996    /// identity of the local node to the peers closest to the key, thus establishing
997    /// the local node as a provider.
998    ///
999    /// Returns `Ok` if a provider record has been stored locally, providing the
1000    /// `QueryId` of the initial query that announces the local node as a provider.
1001    ///
1002    /// The publication of the provider records is periodically repeated as per the
1003    /// configured interval, to renew the expiry and account for changes to the DHT
1004    /// topology. A provider record may be removed from local storage and
1005    /// thus no longer re-published by calling [`Behaviour::stop_providing`].
1006    ///
1007    /// In contrast to the standard Kademlia push-based model for content distribution
1008    /// implemented by [`Behaviour::put_record`], the provider API implements a
1009    /// pull-based model that may be used in addition or as an alternative.
1010    /// The means by which the actual value is obtained from a provider is out of scope
1011    /// of the libp2p Kademlia provider API.
1012    ///
1013    /// The results of the (repeated) provider announcements sent by this node are
1014    /// reported via [`Event::OutboundQueryProgressed{QueryResult::StartProviding}`].
1015    pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
1016        // Note: We store our own provider records locally without local addresses
1017        // to avoid redundant storage and outdated addresses. Instead these are
1018        // acquired on demand when returning a `ProviderRecord` for the local node.
1019        let local_addrs = Vec::new();
1020        let record = ProviderRecord::new(
1021            key.clone(),
1022            *self.kbuckets.local_key().preimage(),
1023            local_addrs,
1024        );
1025        self.store.add_provider(record)?;
1026        let target = kbucket::Key::new(key.clone());
1027        let peers = self.kbuckets.closest_keys(&target);
1028        let context = AddProviderContext::Publish;
1029        let info = QueryInfo::AddProvider {
1030            context,
1031            key,
1032            phase: AddProviderPhase::GetClosestPeers,
1033        };
1034        let id = self.queries.add_iter_closest(target.clone(), peers, info);
1035        Ok(id)
1036    }
1037
1038    /// Stops the local node from announcing that it is a provider for the given key.
1039    ///
1040    /// This is a local operation. The local node will still be considered as a
1041    /// provider for the key by other nodes until these provider records expire.
1042    pub fn stop_providing(&mut self, key: &record::Key) {
1043        self.store
1044            .remove_provider(key, self.kbuckets.local_key().preimage());
1045    }
1046
1047    /// Performs a lookup for providers of a value to the given key.
1048    ///
1049    /// The result of this operation is delivered in a
1050    /// reported via [`Event::OutboundQueryProgressed{QueryResult::GetProviders}`].
1051    pub fn get_providers(&mut self, key: record::Key) -> QueryId {
1052        let providers: HashSet<_> = self
1053            .store
1054            .providers(&key)
1055            .into_iter()
1056            .filter(|p| !p.is_expired(Instant::now()))
1057            .map(|p| p.provider)
1058            .collect();
1059
1060        let step = ProgressStep::first();
1061
1062        let info = QueryInfo::GetProviders {
1063            key: key.clone(),
1064            providers_found: providers.len(),
1065            step: if providers.is_empty() {
1066                step.clone()
1067            } else {
1068                step.next()
1069            },
1070        };
1071
1072        let target = kbucket::Key::new(key.clone());
1073        let peers = self.kbuckets.closest_keys(&target);
1074        let id = self.queries.add_iter_closest(target.clone(), peers, info);
1075
1076        // No queries were actually done for the results yet.
1077        let stats = QueryStats::empty();
1078
1079        if !providers.is_empty() {
1080            self.queued_events
1081                .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
1082                    id,
1083                    result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
1084                        key,
1085                        providers,
1086                    })),
1087                    step,
1088                    stats,
1089                }));
1090        }
1091        id
1092    }
1093
1094    /// Set the [`Mode`] in which we should operate.
1095    ///
1096    /// By default, we are in [`Mode::Client`] and will swap into [`Mode::Server`] as soon as we
1097    /// have a confirmed, external address via [`FromSwarm::ExternalAddrConfirmed`].
1098    ///
1099    /// Setting a mode via this function disables this automatic behaviour and unconditionally
1100    /// operates in the specified mode. To reactivate the automatic configuration, pass [`None`]
1101    /// instead.
1102    pub fn set_mode(&mut self, mode: Option<Mode>) {
1103        match mode {
1104            Some(mode) => {
1105                self.mode = mode;
1106                self.auto_mode = false;
1107                self.reconfigure_mode();
1108            }
1109            None => {
1110                self.auto_mode = true;
1111                self.determine_mode_from_external_addresses();
1112            }
1113        }
1114
1115        if let Some(waker) = self.no_events_waker.take() {
1116            waker.wake();
1117        }
1118    }
1119
1120    /// Get the [`Mode`] in which the DHT is currently operating.
1121    pub fn mode(&self) -> Mode {
1122        self.mode
1123    }
1124
1125    fn reconfigure_mode(&mut self) {
1126        if self.connections.is_empty() {
1127            return;
1128        }
1129
1130        let num_connections = self.connections.len();
1131
1132        tracing::debug!(
1133            "Re-configuring {} established connection{}",
1134            num_connections,
1135            if num_connections > 1 { "s" } else { "" }
1136        );
1137
1138        self.queued_events
1139            .extend(
1140                self.connections
1141                    .iter()
1142                    .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
1143                        peer_id: *peer_id,
1144                        handler: NotifyHandler::One(*conn_id),
1145                        event: HandlerIn::ReconfigureMode {
1146                            new_mode: self.mode,
1147                        },
1148                    }),
1149            );
1150    }
1151
1152    fn determine_mode_from_external_addresses(&mut self) {
1153        let old_mode = self.mode;
1154
1155        self.mode = match (self.external_addresses.as_slice(), self.mode) {
1156            ([], Mode::Server) => {
1157                tracing::debug!("Switching to client-mode because we no longer have any confirmed external addresses");
1158
1159                Mode::Client
1160            }
1161            ([], Mode::Client) => {
1162                // Previously client-mode, now also client-mode because no external addresses.
1163
1164                Mode::Client
1165            }
1166            (confirmed_external_addresses, Mode::Client) => {
1167                if tracing::enabled!(Level::DEBUG) {
1168                    let confirmed_external_addresses =
1169                        to_comma_separated_list(confirmed_external_addresses);
1170
1171                    tracing::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
1172                }
1173
1174                Mode::Server
1175            }
1176            (confirmed_external_addresses, Mode::Server) => {
1177                debug_assert!(
1178                    !confirmed_external_addresses.is_empty(),
1179                    "Previous match arm handled empty list"
1180                );
1181
1182                // Previously, server-mode, now also server-mode because > 1 external address.
1183                //  Don't log anything to avoid spam.
1184                Mode::Server
1185            }
1186        };
1187
1188        self.reconfigure_mode();
1189
1190        if old_mode != self.mode {
1191            self.queued_events
1192                .push_back(ToSwarm::GenerateEvent(Event::ModeChanged {
1193                    new_mode: self.mode,
1194                }));
1195        }
1196    }
1197
1198    /// Processes discovered peers from a successful request in an iterative `Query`.
1199    fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
1200    where
1201        I: Iterator<Item = &'a KadPeer> + Clone,
1202    {
1203        let local_id = self.kbuckets.local_key().preimage();
1204        let others_iter = peers.filter(|p| &p.node_id != local_id);
1205        if let Some(query) = self.queries.get_mut(query_id) {
1206            tracing::trace!(peer=%source, query=?query_id, "Request to peer in query succeeded");
1207            for peer in others_iter.clone() {
1208                tracing::trace!(
1209                    ?peer,
1210                    %source,
1211                    query=?query_id,
1212                    "Peer reported by source in query"
1213                );
1214                let addrs = peer.multiaddrs.iter().cloned().collect();
1215                query.peers.addresses.insert(peer.node_id, addrs);
1216            }
1217            query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
1218        }
1219    }
1220
1221    /// Collects all peers who are known to be providers of the value for a given `Multihash`.
1222    fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
1223        let kbuckets = &mut self.kbuckets;
1224        let connected = &mut self.connected_peers;
1225        let listen_addresses = &self.listen_addresses;
1226        let external_addresses = &self.external_addresses;
1227
1228        self.store
1229            .providers(key)
1230            .into_iter()
1231            .filter_map(move |p| {
1232                if &p.provider != source {
1233                    let node_id = p.provider;
1234                    let multiaddrs = p.addresses;
1235                    let connection_ty = if connected.contains(&node_id) {
1236                        ConnectionType::Connected
1237                    } else {
1238                        ConnectionType::NotConnected
1239                    };
1240                    if multiaddrs.is_empty() {
1241                        // The provider is either the local node and we fill in
1242                        // the local addresses on demand, or it is a legacy
1243                        // provider record without addresses, in which case we
1244                        // try to find addresses in the routing table, as was
1245                        // done before provider records were stored along with
1246                        // their addresses.
1247                        if &node_id == kbuckets.local_key().preimage() {
1248                            Some(
1249                                listen_addresses
1250                                    .iter()
1251                                    .chain(external_addresses.iter())
1252                                    .cloned()
1253                                    .collect::<Vec<_>>(),
1254                            )
1255                        } else {
1256                            let key = kbucket::Key::from(node_id);
1257                            kbuckets
1258                                .entry(&key)
1259                                .as_mut()
1260                                .and_then(|e| e.view())
1261                                .map(|e| e.node.value.clone().into_vec())
1262                        }
1263                    } else {
1264                        Some(multiaddrs)
1265                    }
1266                    .map(|multiaddrs| KadPeer {
1267                        node_id,
1268                        multiaddrs,
1269                        connection_ty,
1270                    })
1271                } else {
1272                    None
1273                }
1274            })
1275            .take(self.queries.config().replication_factor.get())
1276            .collect()
1277    }
1278
1279    /// Starts an iterative `ADD_PROVIDER` query for the given key.
1280    fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
1281        let info = QueryInfo::AddProvider {
1282            context,
1283            key: key.clone(),
1284            phase: AddProviderPhase::GetClosestPeers,
1285        };
1286        let target = kbucket::Key::new(key);
1287        let peers = self.kbuckets.closest_keys(&target);
1288        self.queries.add_iter_closest(target.clone(), peers, info);
1289    }
1290
1291    /// Starts an iterative `PUT_VALUE` query for the given record.
1292    fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
1293        let quorum = quorum.eval(self.queries.config().replication_factor);
1294        let target = kbucket::Key::new(record.key.clone());
1295        let peers = self.kbuckets.closest_keys(&target);
1296        let info = QueryInfo::PutRecord {
1297            record,
1298            quorum,
1299            context,
1300            phase: PutRecordPhase::GetClosestPeers,
1301        };
1302        self.queries.add_iter_closest(target.clone(), peers, info);
1303    }
1304
1305    /// Updates the routing table with a new connection status and address of a peer.
1306    fn connection_updated(
1307        &mut self,
1308        peer: PeerId,
1309        address: Option<Multiaddr>,
1310        new_status: NodeStatus,
1311    ) {
1312        let key = kbucket::Key::from(peer);
1313        match self.kbuckets.entry(&key) {
1314            Some(kbucket::Entry::Present(mut entry, old_status)) => {
1315                if old_status != new_status {
1316                    entry.update(new_status)
1317                }
1318                if let Some(address) = address {
1319                    if entry.value().insert(address) {
1320                        self.queued_events.push_back(ToSwarm::GenerateEvent(
1321                            Event::RoutingUpdated {
1322                                peer,
1323                                is_new_peer: false,
1324                                addresses: entry.value().clone(),
1325                                old_peer: None,
1326                                bucket_range: self
1327                                    .kbuckets
1328                                    .bucket(&key)
1329                                    .map(|b| b.range())
1330                                    .expect("Not kbucket::Entry::SelfEntry."),
1331                            },
1332                        ))
1333                    }
1334                }
1335            }
1336
1337            Some(kbucket::Entry::Pending(mut entry, old_status)) => {
1338                if let Some(address) = address {
1339                    entry.value().insert(address);
1340                }
1341                if old_status != new_status {
1342                    entry.update(new_status);
1343                }
1344            }
1345
1346            Some(kbucket::Entry::Absent(entry)) => {
1347                // Only connected nodes with a known address are newly inserted.
1348                if new_status != NodeStatus::Connected {
1349                    return;
1350                }
1351                match (address, self.kbucket_inserts) {
1352                    (None, _) => {
1353                        self.queued_events
1354                            .push_back(ToSwarm::GenerateEvent(Event::UnroutablePeer { peer }));
1355                    }
1356                    (Some(a), BucketInserts::Manual) => {
1357                        self.queued_events
1358                            .push_back(ToSwarm::GenerateEvent(Event::RoutablePeer {
1359                                peer,
1360                                address: a,
1361                            }));
1362                    }
1363                    (Some(a), BucketInserts::OnConnected) => {
1364                        let addresses = Addresses::new(a);
1365                        match entry.insert(addresses.clone(), new_status) {
1366                            kbucket::InsertResult::Inserted => {
1367                                self.bootstrap_on_low_peers();
1368
1369                                let event = Event::RoutingUpdated {
1370                                    peer,
1371                                    is_new_peer: true,
1372                                    addresses,
1373                                    old_peer: None,
1374                                    bucket_range: self
1375                                        .kbuckets
1376                                        .bucket(&key)
1377                                        .map(|b| b.range())
1378                                        .expect("Not kbucket::Entry::SelfEntry."),
1379                                };
1380                                self.queued_events.push_back(ToSwarm::GenerateEvent(event));
1381                            }
1382                            kbucket::InsertResult::Full => {
1383                                tracing::debug!(
1384                                    %peer,
1385                                    "Bucket full. Peer not added to routing table"
1386                                );
1387                                let address = addresses.first().clone();
1388                                self.queued_events.push_back(ToSwarm::GenerateEvent(
1389                                    Event::RoutablePeer { peer, address },
1390                                ));
1391                            }
1392                            kbucket::InsertResult::Pending { disconnected } => {
1393                                let address = addresses.first().clone();
1394                                self.queued_events.push_back(ToSwarm::GenerateEvent(
1395                                    Event::PendingRoutablePeer { peer, address },
1396                                ));
1397
1398                                // `disconnected` might already be in the process of re-connecting.
1399                                // In other words `disconnected` might have already re-connected but
1400                                // is not yet confirmed to support the Kademlia protocol via
1401                                // [`HandlerEvent::ProtocolConfirmed`].
1402                                //
1403                                // Only try dialing peer if not currently connected.
1404                                if !self.connected_peers.contains(disconnected.preimage()) {
1405                                    self.queued_events.push_back(ToSwarm::Dial {
1406                                        opts: DialOpts::peer_id(disconnected.into_preimage())
1407                                            .build(),
1408                                    })
1409                                }
1410                            }
1411                        }
1412                    }
1413                }
1414            }
1415            _ => {}
1416        }
1417    }
1418
1419    /// A new peer has been inserted in the routing table but we check if the routing
1420    /// table is currently small (less that `K_VALUE` peers are present) and only
1421    /// trigger a bootstrap in that case
1422    fn bootstrap_on_low_peers(&mut self) {
1423        if self
1424            .kbuckets()
1425            .map(|kbucket| kbucket.num_entries())
1426            .sum::<usize>()
1427            < K_VALUE.get()
1428        {
1429            self.bootstrap_status.trigger();
1430        }
1431    }
1432
1433    /// Handles a finished (i.e. successful) query.
1434    fn query_finished(&mut self, q: Query) -> Option<Event> {
1435        let query_id = q.id();
1436        tracing::trace!(query=?query_id, "Query finished");
1437        match q.info {
1438            QueryInfo::Bootstrap {
1439                peer,
1440                remaining,
1441                mut step,
1442            } => {
1443                let local_key = *self.kbuckets.local_key();
1444                let mut remaining = remaining.unwrap_or_else(|| {
1445                    debug_assert_eq!(&peer, local_key.preimage());
1446                    // The lookup for the local key finished. To complete the bootstrap process,
1447                    // a bucket refresh should be performed for every bucket farther away than
1448                    // the first non-empty bucket (which are most likely no more than the last
1449                    // few, i.e. farthest, buckets).
1450                    self.kbuckets
1451                        .iter()
1452                        .skip_while(|b| b.is_empty())
1453                        .skip(1) // Skip the bucket with the closest neighbour.
1454                        .map(|b| {
1455                            // Try to find a key that falls into the bucket. While such keys can
1456                            // be generated fully deterministically, the current libp2p kademlia
1457                            // wire protocol requires transmission of the preimages of the actual
1458                            // keys in the DHT keyspace, hence for now this is just a "best effort"
1459                            // to find a key that hashes into a specific bucket. The probabilities
1460                            // of finding a key in the bucket `b` with as most 16 trials are as
1461                            // follows:
1462                            //
1463                            // Pr(bucket-255) = 1 - (1/2)^16   ~= 1
1464                            // Pr(bucket-254) = 1 - (3/4)^16   ~= 1
1465                            // Pr(bucket-253) = 1 - (7/8)^16   ~= 0.88
1466                            // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
1467                            // ...
1468                            let mut target = kbucket::Key::from(PeerId::random());
1469                            for _ in 0..16 {
1470                                let d = local_key.distance(&target);
1471                                if b.contains(&d) {
1472                                    break;
1473                                }
1474                                target = kbucket::Key::from(PeerId::random());
1475                            }
1476                            target
1477                        })
1478                        .collect::<Vec<_>>()
1479                        .into_iter()
1480                });
1481
1482                let num_remaining = remaining.len() as u32;
1483
1484                if let Some(target) = remaining.next() {
1485                    let info = QueryInfo::Bootstrap {
1486                        peer: *target.preimage(),
1487                        remaining: Some(remaining),
1488                        step: step.next(),
1489                    };
1490                    let peers = self.kbuckets.closest_keys(&target);
1491                    self.queries
1492                        .continue_iter_closest(query_id, target, peers, info);
1493                } else {
1494                    step.last = true;
1495                    self.bootstrap_status.on_finish();
1496                };
1497
1498                Some(Event::OutboundQueryProgressed {
1499                    id: query_id,
1500                    stats: q.stats,
1501                    result: QueryResult::Bootstrap(Ok(BootstrapOk {
1502                        peer,
1503                        num_remaining,
1504                    })),
1505                    step,
1506                })
1507            }
1508
1509            QueryInfo::GetClosestPeers { key, mut step, .. } => {
1510                step.last = true;
1511
1512                Some(Event::OutboundQueryProgressed {
1513                    id: query_id,
1514                    stats: q.stats,
1515                    result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk {
1516                        key,
1517                        peers: q.peers.into_peerinfos_iter().collect(),
1518                    })),
1519                    step,
1520                })
1521            }
1522
1523            QueryInfo::GetProviders { mut step, .. } => {
1524                step.last = true;
1525
1526                Some(Event::OutboundQueryProgressed {
1527                    id: query_id,
1528                    stats: q.stats,
1529                    result: QueryResult::GetProviders(Ok(
1530                        GetProvidersOk::FinishedWithNoAdditionalRecord {
1531                            closest_peers: q.peers.into_peerids_iter().collect(),
1532                        },
1533                    )),
1534                    step,
1535                })
1536            }
1537
1538            QueryInfo::AddProvider {
1539                context,
1540                key,
1541                phase: AddProviderPhase::GetClosestPeers,
1542            } => {
1543                let provider_id = self.local_peer_id;
1544                let external_addresses = self.external_addresses.iter().cloned().collect();
1545                let info = QueryInfo::AddProvider {
1546                    context,
1547                    key,
1548                    phase: AddProviderPhase::AddProvider {
1549                        provider_id,
1550                        external_addresses,
1551                        get_closest_peers_stats: q.stats,
1552                    },
1553                };
1554                self.queries
1555                    .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1556                None
1557            }
1558
1559            QueryInfo::AddProvider {
1560                context,
1561                key,
1562                phase:
1563                    AddProviderPhase::AddProvider {
1564                        get_closest_peers_stats,
1565                        ..
1566                    },
1567            } => match context {
1568                AddProviderContext::Publish => Some(Event::OutboundQueryProgressed {
1569                    id: query_id,
1570                    stats: get_closest_peers_stats.merge(q.stats),
1571                    result: QueryResult::StartProviding(Ok(AddProviderOk { key })),
1572                    step: ProgressStep::first_and_last(),
1573                }),
1574                AddProviderContext::Republish => Some(Event::OutboundQueryProgressed {
1575                    id: query_id,
1576                    stats: get_closest_peers_stats.merge(q.stats),
1577                    result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })),
1578                    step: ProgressStep::first_and_last(),
1579                }),
1580            },
1581
1582            QueryInfo::GetRecord {
1583                key,
1584                mut step,
1585                found_a_record,
1586                cache_candidates,
1587            } => {
1588                step.last = true;
1589
1590                let results = if found_a_record {
1591                    Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates })
1592                } else {
1593                    Err(GetRecordError::NotFound {
1594                        key,
1595                        closest_peers: q.peers.into_peerids_iter().collect(),
1596                    })
1597                };
1598                Some(Event::OutboundQueryProgressed {
1599                    id: query_id,
1600                    stats: q.stats,
1601                    result: QueryResult::GetRecord(results),
1602                    step,
1603                })
1604            }
1605
1606            QueryInfo::PutRecord {
1607                context,
1608                record,
1609                quorum,
1610                phase: PutRecordPhase::GetClosestPeers,
1611            } => {
1612                let info = QueryInfo::PutRecord {
1613                    context,
1614                    record,
1615                    quorum,
1616                    phase: PutRecordPhase::PutRecord {
1617                        success: vec![],
1618                        get_closest_peers_stats: q.stats,
1619                    },
1620                };
1621                self.queries
1622                    .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1623                None
1624            }
1625
1626            QueryInfo::PutRecord {
1627                context,
1628                record,
1629                quorum,
1630                phase:
1631                    PutRecordPhase::PutRecord {
1632                        success,
1633                        get_closest_peers_stats,
1634                    },
1635            } => {
1636                let mk_result = |key: record::Key| {
1637                    if success.len() >= quorum.get() {
1638                        Ok(PutRecordOk { key })
1639                    } else {
1640                        Err(PutRecordError::QuorumFailed {
1641                            key,
1642                            quorum,
1643                            success,
1644                        })
1645                    }
1646                };
1647                match context {
1648                    PutRecordContext::Publish | PutRecordContext::Custom => {
1649                        Some(Event::OutboundQueryProgressed {
1650                            id: query_id,
1651                            stats: get_closest_peers_stats.merge(q.stats),
1652                            result: QueryResult::PutRecord(mk_result(record.key)),
1653                            step: ProgressStep::first_and_last(),
1654                        })
1655                    }
1656                    PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1657                        id: query_id,
1658                        stats: get_closest_peers_stats.merge(q.stats),
1659                        result: QueryResult::RepublishRecord(mk_result(record.key)),
1660                        step: ProgressStep::first_and_last(),
1661                    }),
1662                    PutRecordContext::Replicate => {
1663                        tracing::debug!(record=?record.key, "Record replicated");
1664                        None
1665                    }
1666                }
1667            }
1668        }
1669    }
1670
1671    /// Handles a query that timed out.
1672    fn query_timeout(&mut self, query: Query) -> Option<Event> {
1673        let query_id = query.id();
1674        tracing::trace!(query=?query_id, "Query timed out");
1675        match query.info {
1676            QueryInfo::Bootstrap {
1677                peer,
1678                mut remaining,
1679                mut step,
1680            } => {
1681                let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1682
1683                // Continue with the next bootstrap query if `remaining` is not empty.
1684                if let Some((target, remaining)) =
1685                    remaining.take().and_then(|mut r| Some((r.next()?, r)))
1686                {
1687                    let info = QueryInfo::Bootstrap {
1688                        peer: target.into_preimage(),
1689                        remaining: Some(remaining),
1690                        step: step.next(),
1691                    };
1692                    let peers = self.kbuckets.closest_keys(&target);
1693                    self.queries
1694                        .continue_iter_closest(query_id, target, peers, info);
1695                } else {
1696                    step.last = true;
1697                    self.bootstrap_status.on_finish();
1698                }
1699
1700                Some(Event::OutboundQueryProgressed {
1701                    id: query_id,
1702                    stats: query.stats,
1703                    result: QueryResult::Bootstrap(Err(BootstrapError::Timeout {
1704                        peer,
1705                        num_remaining,
1706                    })),
1707                    step,
1708                })
1709            }
1710
1711            QueryInfo::AddProvider { context, key, .. } => Some(match context {
1712                AddProviderContext::Publish => Event::OutboundQueryProgressed {
1713                    id: query_id,
1714                    stats: query.stats,
1715                    result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })),
1716                    step: ProgressStep::first_and_last(),
1717                },
1718                AddProviderContext::Republish => Event::OutboundQueryProgressed {
1719                    id: query_id,
1720                    stats: query.stats,
1721                    result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })),
1722                    step: ProgressStep::first_and_last(),
1723                },
1724            }),
1725
1726            QueryInfo::GetClosestPeers { key, mut step, .. } => {
1727                step.last = true;
1728                Some(Event::OutboundQueryProgressed {
1729                    id: query_id,
1730                    stats: query.stats,
1731                    result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout {
1732                        key,
1733                        peers: query.peers.into_peerinfos_iter().collect(),
1734                    })),
1735                    step,
1736                })
1737            }
1738
1739            QueryInfo::PutRecord {
1740                record,
1741                quorum,
1742                context,
1743                phase,
1744            } => {
1745                let err = Err(PutRecordError::Timeout {
1746                    key: record.key,
1747                    quorum,
1748                    success: match phase {
1749                        PutRecordPhase::GetClosestPeers => vec![],
1750                        PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1751                    },
1752                });
1753                match context {
1754                    PutRecordContext::Publish | PutRecordContext::Custom => {
1755                        Some(Event::OutboundQueryProgressed {
1756                            id: query_id,
1757                            stats: query.stats,
1758                            result: QueryResult::PutRecord(err),
1759                            step: ProgressStep::first_and_last(),
1760                        })
1761                    }
1762                    PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1763                        id: query_id,
1764                        stats: query.stats,
1765                        result: QueryResult::RepublishRecord(err),
1766                        step: ProgressStep::first_and_last(),
1767                    }),
1768                    PutRecordContext::Replicate => match phase {
1769                        PutRecordPhase::GetClosestPeers => {
1770                            tracing::warn!(
1771                                "Locating closest peers for replication failed: {:?}",
1772                                err
1773                            );
1774                            None
1775                        }
1776                        PutRecordPhase::PutRecord { .. } => {
1777                            tracing::debug!("Replicating record failed: {:?}", err);
1778                            None
1779                        }
1780                    },
1781                }
1782            }
1783
1784            QueryInfo::GetRecord { key, mut step, .. } => {
1785                step.last = true;
1786
1787                Some(Event::OutboundQueryProgressed {
1788                    id: query_id,
1789                    stats: query.stats,
1790                    result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })),
1791                    step,
1792                })
1793            }
1794
1795            QueryInfo::GetProviders { key, mut step, .. } => {
1796                step.last = true;
1797
1798                Some(Event::OutboundQueryProgressed {
1799                    id: query_id,
1800                    stats: query.stats,
1801                    result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
1802                        key,
1803                        closest_peers: query.peers.into_peerids_iter().collect(),
1804                    })),
1805                    step,
1806                })
1807            }
1808        }
1809    }
1810
1811    /// Processes a record received from a peer.
1812    fn record_received(
1813        &mut self,
1814        source: PeerId,
1815        connection: ConnectionId,
1816        request_id: RequestId,
1817        mut record: Record,
1818    ) {
1819        if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1820            // If the (alleged) publisher is the local node, do nothing. The record of
1821            // the original publisher should never change as a result of replication
1822            // and the publisher is always assumed to have the "right" value.
1823            self.queued_events.push_back(ToSwarm::NotifyHandler {
1824                peer_id: source,
1825                handler: NotifyHandler::One(connection),
1826                event: HandlerIn::PutRecordRes {
1827                    key: record.key,
1828                    value: record.value,
1829                    request_id,
1830                },
1831            });
1832            return;
1833        }
1834
1835        let now = Instant::now();
1836
1837        // Calculate the expiration exponentially inversely proportional to the
1838        // number of nodes between the local node and the closest node to the key
1839        // (beyond the replication factor). This ensures avoiding over-caching
1840        // outside of the k closest nodes to a key.
1841        let target = kbucket::Key::new(record.key.clone());
1842        let num_between = self.kbuckets.count_nodes_between(&target);
1843        let k = self.queries.config().replication_factor.get();
1844        let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1845        let expiration = self
1846            .record_ttl
1847            .map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1848        // The smaller TTL prevails. Only if neither TTL is set is the record
1849        // stored "forever".
1850        record.expires = record.expires.or(expiration).min(expiration);
1851
1852        if let Some(job) = self.put_record_job.as_mut() {
1853            // Ignore the record in the next run of the replication
1854            // job, since we can assume the sender replicated the
1855            // record to the k closest peers. Effectively, only
1856            // one of the k closest peers performs a replication
1857            // in the configured interval, assuming a shared interval.
1858            job.skip(record.key.clone())
1859        }
1860
1861        // While records received from a publisher, as well as records that do
1862        // not exist locally should always (attempted to) be stored, there is a
1863        // choice here w.r.t. the handling of replicated records whose keys refer
1864        // to records that exist locally: The value and / or the publisher may
1865        // either be overridden or left unchanged. At the moment and in the
1866        // absence of a decisive argument for another option, both are always
1867        // overridden as it avoids having to load the existing record in the
1868        // first place.
1869
1870        if !record.is_expired(now) {
1871            // The record is cloned because of the weird libp2p protocol
1872            // requirement to send back the value in the response, although this
1873            // is a waste of resources.
1874            match self.record_filtering {
1875                StoreInserts::Unfiltered => match self.store.put(record.clone()) {
1876                    Ok(()) => {
1877                        tracing::debug!(
1878                            record=?record.key,
1879                            "Record stored: {} bytes",
1880                            record.value.len()
1881                        );
1882                        self.queued_events.push_back(ToSwarm::GenerateEvent(
1883                            Event::InboundRequest {
1884                                request: InboundRequest::PutRecord {
1885                                    source,
1886                                    connection,
1887                                    record: None,
1888                                },
1889                            },
1890                        ));
1891                    }
1892                    Err(e) => {
1893                        tracing::info!("Record not stored: {:?}", e);
1894                        self.queued_events.push_back(ToSwarm::NotifyHandler {
1895                            peer_id: source,
1896                            handler: NotifyHandler::One(connection),
1897                            event: HandlerIn::Reset(request_id),
1898                        });
1899
1900                        return;
1901                    }
1902                },
1903                StoreInserts::FilterBoth => {
1904                    self.queued_events
1905                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1906                            request: InboundRequest::PutRecord {
1907                                source,
1908                                connection,
1909                                record: Some(record.clone()),
1910                            },
1911                        }));
1912                }
1913            }
1914        }
1915
1916        // The remote receives a [`HandlerIn::PutRecordRes`] even in the
1917        // case where the record is discarded due to being expired. Given that
1918        // the remote sent the local node a [`HandlerEvent::PutRecord`]
1919        // request, the remote perceives the local node as one node among the k
1920        // closest nodes to the target. In addition returning
1921        // [`HandlerIn::PutRecordRes`] does not reveal any internal
1922        // information to a possibly malicious remote node.
1923        self.queued_events.push_back(ToSwarm::NotifyHandler {
1924            peer_id: source,
1925            handler: NotifyHandler::One(connection),
1926            event: HandlerIn::PutRecordRes {
1927                key: record.key,
1928                value: record.value,
1929                request_id,
1930            },
1931        })
1932    }
1933
1934    /// Processes a provider record received from a peer.
1935    fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
1936        if &provider.node_id != self.kbuckets.local_key().preimage() {
1937            let record = ProviderRecord {
1938                key,
1939                provider: provider.node_id,
1940                expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1941                addresses: provider.multiaddrs,
1942            };
1943            match self.record_filtering {
1944                StoreInserts::Unfiltered => {
1945                    if let Err(e) = self.store.add_provider(record) {
1946                        tracing::info!("Provider record not stored: {:?}", e);
1947                        return;
1948                    }
1949
1950                    self.queued_events
1951                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1952                            request: InboundRequest::AddProvider { record: None },
1953                        }));
1954                }
1955                StoreInserts::FilterBoth => {
1956                    self.queued_events
1957                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1958                            request: InboundRequest::AddProvider {
1959                                record: Some(record),
1960                            },
1961                        }));
1962                }
1963            }
1964        }
1965    }
1966
1967    fn address_failed(&mut self, peer_id: PeerId, address: &Multiaddr) {
1968        let key = kbucket::Key::from(peer_id);
1969
1970        if let Some(addrs) = self.kbuckets.entry(&key).as_mut().and_then(|e| e.value()) {
1971            // TODO: Ideally, the address should only be removed if the error can
1972            // be classified as "permanent" but since `err` is currently a borrowed
1973            // trait object without a `'static` bound, even downcasting for inspection
1974            // of the error is not possible (and also not truly desirable or ergonomic).
1975            // The error passed in should rather be a dedicated enum.
1976            if addrs.remove(address).is_ok() {
1977                tracing::debug!(
1978                    peer=%peer_id,
1979                    %address,
1980                    "Address removed from peer due to error."
1981                );
1982            } else {
1983                // Despite apparently having no reachable address (any longer),
1984                // the peer is kept in the routing table with the last address to avoid
1985                // (temporary) loss of network connectivity to "flush" the routing
1986                // table. Once in, a peer is only removed from the routing table
1987                // if it is the least recently connected peer, currently disconnected
1988                // and is unreachable in the context of another peer pending insertion
1989                // into the same bucket. This is handled transparently by the
1990                // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
1991                // within `Behaviour::poll`.
1992                tracing::debug!(
1993                    peer=%peer_id,
1994                    %address,
1995                    "Last remaining address of peer is unreachable."
1996                );
1997            }
1998        }
1999
2000        for query in self.queries.iter_mut() {
2001            if let Some(addrs) = query.peers.addresses.get_mut(&peer_id) {
2002                addrs.retain(|a| a != address);
2003            }
2004        }
2005    }
2006
2007    fn on_connection_established(
2008        &mut self,
2009        ConnectionEstablished {
2010            peer_id,
2011            failed_addresses,
2012            other_established,
2013            ..
2014        }: ConnectionEstablished,
2015    ) {
2016        for addr in failed_addresses {
2017            self.address_failed(peer_id, addr);
2018        }
2019
2020        // Peer's first connection.
2021        if other_established == 0 {
2022            self.connected_peers.insert(peer_id);
2023        }
2024    }
2025
2026    fn on_address_change(
2027        &mut self,
2028        AddressChange {
2029            peer_id: peer,
2030            old,
2031            new,
2032            ..
2033        }: AddressChange,
2034    ) {
2035        let (old, new) = (old.get_remote_address(), new.get_remote_address());
2036
2037        // Update routing table.
2038        if let Some(addrs) = self
2039            .kbuckets
2040            .entry(&kbucket::Key::from(peer))
2041            .as_mut()
2042            .and_then(|e| e.value())
2043        {
2044            if addrs.replace(old, new) {
2045                tracing::debug!(
2046                    %peer,
2047                    old_address=%old,
2048                    new_address=%new,
2049                    "Old address replaced with new address for peer."
2050                );
2051            } else {
2052                tracing::debug!(
2053                    %peer,
2054                    old_address=%old,
2055                    new_address=%new,
2056                    "Old address not replaced with new address for peer as old address wasn't present.",
2057                );
2058            }
2059        } else {
2060            tracing::debug!(
2061                %peer,
2062                old_address=%old,
2063                new_address=%new,
2064                "Old address not replaced with new address for peer as peer is not present in the \
2065                 routing table."
2066            );
2067        }
2068
2069        // Update query address cache.
2070        //
2071        // Given two connected nodes: local node A and remote node B. Say node B
2072        // is not in node A's routing table. Additionally node B is part of the
2073        // `Query::addresses` list of an ongoing query on node A. Say Node
2074        // B triggers an address change and then disconnects. Later on the
2075        // earlier mentioned query on node A would like to connect to node B.
2076        // Without replacing the address in the `Query::addresses` set node
2077        // A would attempt to dial the old and not the new address.
2078        //
2079        // While upholding correctness, iterating through all discovered
2080        // addresses of a peer in all currently ongoing queries might have a
2081        // large performance impact. If so, the code below might be worth
2082        // revisiting.
2083        for query in self.queries.iter_mut() {
2084            if let Some(addrs) = query.peers.addresses.get_mut(&peer) {
2085                for addr in addrs.iter_mut() {
2086                    if addr == old {
2087                        *addr = new.clone();
2088                    }
2089                }
2090            }
2091        }
2092    }
2093
2094    fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) {
2095        let Some(peer_id) = peer_id else { return };
2096
2097        match error {
2098            DialError::LocalPeerId { .. }
2099            | DialError::WrongPeerId { .. }
2100            | DialError::Aborted
2101            | DialError::Denied { .. }
2102            | DialError::Transport(_)
2103            | DialError::NoAddresses => {
2104                if let DialError::Transport(addresses) = error {
2105                    for (addr, _) in addresses {
2106                        self.address_failed(peer_id, addr)
2107                    }
2108                }
2109
2110                for query in self.queries.iter_mut() {
2111                    query.on_failure(&peer_id);
2112                }
2113            }
2114            DialError::DialPeerConditionFalse(
2115                dial_opts::PeerCondition::Disconnected
2116                | dial_opts::PeerCondition::NotDialing
2117                | dial_opts::PeerCondition::DisconnectedAndNotDialing,
2118            ) => {
2119                // We might (still) be connected, or about to be connected, thus do not report the
2120                // failure to the queries.
2121            }
2122            DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
2123                unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
2124            }
2125        }
2126    }
2127
2128    fn on_connection_closed(
2129        &mut self,
2130        ConnectionClosed {
2131            peer_id,
2132            remaining_established,
2133            connection_id,
2134            ..
2135        }: ConnectionClosed,
2136    ) {
2137        self.connections.remove(&connection_id);
2138
2139        if remaining_established == 0 {
2140            for query in self.queries.iter_mut() {
2141                query.on_failure(&peer_id);
2142            }
2143            self.connection_updated(peer_id, None, NodeStatus::Disconnected);
2144            self.connected_peers.remove(&peer_id);
2145        }
2146    }
2147
2148    /// Preloads a new [`Handler`] with requests that are waiting
2149    /// to be sent to the newly connected peer.
2150    fn preload_new_handler(
2151        &mut self,
2152        handler: &mut Handler,
2153        connection_id: ConnectionId,
2154        peer: PeerId,
2155    ) {
2156        self.connections.insert(connection_id, peer);
2157        // Queue events for sending pending RPCs to the connected peer.
2158        // There can be only one pending RPC for a particular peer and query per definition.
2159        for (_peer_id, event) in self.queries.iter_mut().filter_map(|q| {
2160            q.pending_rpcs
2161                .iter()
2162                .position(|(p, _)| p == &peer)
2163                .map(|p| q.pending_rpcs.remove(p))
2164        }) {
2165            handler.on_behaviour_event(event)
2166        }
2167    }
2168}
2169
2170/// Exponentially decrease the given duration (base 2).
2171fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
2172    Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
2173}
2174
2175impl<TStore> NetworkBehaviour for Behaviour<TStore>
2176where
2177    TStore: RecordStore + Send + 'static,
2178{
2179    type ConnectionHandler = Handler;
2180    type ToSwarm = Event;
2181
2182    fn handle_established_inbound_connection(
2183        &mut self,
2184        connection_id: ConnectionId,
2185        peer: PeerId,
2186        local_addr: &Multiaddr,
2187        remote_addr: &Multiaddr,
2188    ) -> Result<THandler<Self>, ConnectionDenied> {
2189        let connected_point = ConnectedPoint::Listener {
2190            local_addr: local_addr.clone(),
2191            send_back_addr: remote_addr.clone(),
2192        };
2193
2194        let mut handler = Handler::new(
2195            self.protocol_config.clone(),
2196            connected_point,
2197            peer,
2198            self.mode,
2199        );
2200        self.preload_new_handler(&mut handler, connection_id, peer);
2201
2202        Ok(handler)
2203    }
2204
2205    fn handle_established_outbound_connection(
2206        &mut self,
2207        connection_id: ConnectionId,
2208        peer: PeerId,
2209        addr: &Multiaddr,
2210        role_override: Endpoint,
2211        port_use: PortUse,
2212    ) -> Result<THandler<Self>, ConnectionDenied> {
2213        let connected_point = ConnectedPoint::Dialer {
2214            address: addr.clone(),
2215            role_override,
2216            port_use,
2217        };
2218
2219        let mut handler = Handler::new(
2220            self.protocol_config.clone(),
2221            connected_point,
2222            peer,
2223            self.mode,
2224        );
2225        self.preload_new_handler(&mut handler, connection_id, peer);
2226
2227        Ok(handler)
2228    }
2229
2230    fn handle_pending_outbound_connection(
2231        &mut self,
2232        _connection_id: ConnectionId,
2233        maybe_peer: Option<PeerId>,
2234        _addresses: &[Multiaddr],
2235        _effective_role: Endpoint,
2236    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
2237        let peer_id = match maybe_peer {
2238            None => return Ok(vec![]),
2239            Some(peer) => peer,
2240        };
2241
2242        // We should order addresses from decreasing likelihood of connectivity, so start with
2243        // the addresses of that peer in the k-buckets.
2244        let key = kbucket::Key::from(peer_id);
2245        let mut peer_addrs =
2246            if let Some(kbucket::Entry::Present(mut entry, _)) = self.kbuckets.entry(&key) {
2247                let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
2248                debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
2249                addrs
2250            } else {
2251                Vec::new()
2252            };
2253
2254        // We add to that a temporary list of addresses from the ongoing queries.
2255        for query in self.queries.iter() {
2256            if let Some(addrs) = query.peers.addresses.get(&peer_id) {
2257                peer_addrs.extend(addrs.iter().cloned())
2258            }
2259        }
2260
2261        Ok(peer_addrs)
2262    }
2263
2264    fn on_connection_handler_event(
2265        &mut self,
2266        source: PeerId,
2267        connection: ConnectionId,
2268        event: THandlerOutEvent<Self>,
2269    ) {
2270        match event {
2271            HandlerEvent::ProtocolConfirmed { endpoint } => {
2272                debug_assert!(self.connected_peers.contains(&source));
2273                // The remote's address can only be put into the routing table,
2274                // and thus shared with other nodes, if the local node is the dialer,
2275                // since the remote address on an inbound connection may be specific
2276                // to that connection (e.g. typically the TCP port numbers).
2277                let address = match endpoint {
2278                    ConnectedPoint::Dialer { address, .. } => Some(address),
2279                    ConnectedPoint::Listener { .. } => None,
2280                };
2281
2282                self.connection_updated(source, address, NodeStatus::Connected);
2283            }
2284
2285            HandlerEvent::ProtocolNotSupported { endpoint } => {
2286                let address = match endpoint {
2287                    ConnectedPoint::Dialer { address, .. } => Some(address),
2288                    ConnectedPoint::Listener { .. } => None,
2289                };
2290                self.connection_updated(source, address, NodeStatus::Disconnected);
2291            }
2292
2293            HandlerEvent::FindNodeReq { key, request_id } => {
2294                let closer_peers = self
2295                    .find_closest_local_peers(&kbucket::Key::new(key), &source)
2296                    .collect::<Vec<_>>();
2297
2298                self.queued_events
2299                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2300                        request: InboundRequest::FindNode {
2301                            num_closer_peers: closer_peers.len(),
2302                        },
2303                    }));
2304
2305                self.queued_events.push_back(ToSwarm::NotifyHandler {
2306                    peer_id: source,
2307                    handler: NotifyHandler::One(connection),
2308                    event: HandlerIn::FindNodeRes {
2309                        closer_peers,
2310                        request_id,
2311                    },
2312                });
2313            }
2314
2315            HandlerEvent::FindNodeRes {
2316                closer_peers,
2317                query_id,
2318            } => {
2319                self.discovered(&query_id, &source, closer_peers.iter());
2320            }
2321
2322            HandlerEvent::GetProvidersReq { key, request_id } => {
2323                let provider_peers = self.provider_peers(&key, &source);
2324                let closer_peers = self
2325                    .find_closest_local_peers(&kbucket::Key::new(key), &source)
2326                    .collect::<Vec<_>>();
2327
2328                self.queued_events
2329                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2330                        request: InboundRequest::GetProvider {
2331                            num_closer_peers: closer_peers.len(),
2332                            num_provider_peers: provider_peers.len(),
2333                        },
2334                    }));
2335
2336                self.queued_events.push_back(ToSwarm::NotifyHandler {
2337                    peer_id: source,
2338                    handler: NotifyHandler::One(connection),
2339                    event: HandlerIn::GetProvidersRes {
2340                        closer_peers,
2341                        provider_peers,
2342                        request_id,
2343                    },
2344                });
2345            }
2346
2347            HandlerEvent::GetProvidersRes {
2348                closer_peers,
2349                provider_peers,
2350                query_id,
2351            } => {
2352                let peers = closer_peers.iter().chain(provider_peers.iter());
2353                self.discovered(&query_id, &source, peers);
2354                if let Some(query) = self.queries.get_mut(&query_id) {
2355                    let stats = query.stats().clone();
2356                    if let QueryInfo::GetProviders {
2357                        ref key,
2358                        ref mut providers_found,
2359                        ref mut step,
2360                        ..
2361                    } = query.info
2362                    {
2363                        *providers_found += provider_peers.len();
2364                        let providers = provider_peers.iter().map(|p| p.node_id).collect();
2365
2366                        self.queued_events.push_back(ToSwarm::GenerateEvent(
2367                            Event::OutboundQueryProgressed {
2368                                id: query_id,
2369                                result: QueryResult::GetProviders(Ok(
2370                                    GetProvidersOk::FoundProviders {
2371                                        key: key.clone(),
2372                                        providers,
2373                                    },
2374                                )),
2375                                step: step.clone(),
2376                                stats,
2377                            },
2378                        ));
2379                        *step = step.next();
2380                    }
2381                }
2382            }
2383            HandlerEvent::QueryError { query_id, error } => {
2384                tracing::debug!(
2385                    peer=%source,
2386                    query=?query_id,
2387                    "Request to peer in query failed with {:?}",
2388                    error
2389                );
2390                // If the query to which the error relates is still active,
2391                // signal the failure w.r.t. `source`.
2392                if let Some(query) = self.queries.get_mut(&query_id) {
2393                    query.on_failure(&source)
2394                }
2395            }
2396
2397            HandlerEvent::AddProvider { key, provider } => {
2398                // Only accept a provider record from a legitimate peer.
2399                if provider.node_id != source {
2400                    return;
2401                }
2402
2403                self.provider_received(key, provider);
2404            }
2405
2406            HandlerEvent::GetRecord { key, request_id } => {
2407                // Lookup the record locally.
2408                let record = match self.store.get(&key) {
2409                    Some(record) => {
2410                        if record.is_expired(Instant::now()) {
2411                            self.store.remove(&key);
2412                            None
2413                        } else {
2414                            Some(record.into_owned())
2415                        }
2416                    }
2417                    None => None,
2418                };
2419
2420                let closer_peers = self
2421                    .find_closest_local_peers(&kbucket::Key::new(key), &source)
2422                    .collect::<Vec<_>>();
2423
2424                self.queued_events
2425                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2426                        request: InboundRequest::GetRecord {
2427                            num_closer_peers: closer_peers.len(),
2428                            present_locally: record.is_some(),
2429                        },
2430                    }));
2431
2432                self.queued_events.push_back(ToSwarm::NotifyHandler {
2433                    peer_id: source,
2434                    handler: NotifyHandler::One(connection),
2435                    event: HandlerIn::GetRecordRes {
2436                        record,
2437                        closer_peers,
2438                        request_id,
2439                    },
2440                });
2441            }
2442
2443            HandlerEvent::GetRecordRes {
2444                record,
2445                closer_peers,
2446                query_id,
2447            } => {
2448                if let Some(query) = self.queries.get_mut(&query_id) {
2449                    let stats = query.stats().clone();
2450                    if let QueryInfo::GetRecord {
2451                        key,
2452                        ref mut step,
2453                        ref mut found_a_record,
2454                        cache_candidates,
2455                    } = &mut query.info
2456                    {
2457                        if let Some(record) = record {
2458                            *found_a_record = true;
2459                            let record = PeerRecord {
2460                                peer: Some(source),
2461                                record,
2462                            };
2463
2464                            self.queued_events.push_back(ToSwarm::GenerateEvent(
2465                                Event::OutboundQueryProgressed {
2466                                    id: query_id,
2467                                    result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(
2468                                        record,
2469                                    ))),
2470                                    step: step.clone(),
2471                                    stats,
2472                                },
2473                            ));
2474
2475                            *step = step.next();
2476                        } else {
2477                            tracing::trace!(record=?key, %source, "Record not found at source");
2478                            if let Caching::Enabled { max_peers } = self.caching {
2479                                let source_key = kbucket::Key::from(source);
2480                                let target_key = kbucket::Key::from(key.clone());
2481                                let distance = source_key.distance(&target_key);
2482                                cache_candidates.insert(distance, source);
2483                                if cache_candidates.len() > max_peers as usize {
2484                                    // TODO: `pop_last()` would be nice once stabilised.
2485                                    // See https://github.com/rust-lang/rust/issues/62924.
2486                                    let last =
2487                                        *cache_candidates.keys().next_back().expect("len > 0");
2488                                    cache_candidates.remove(&last);
2489                                }
2490                            }
2491                        }
2492                    }
2493                }
2494
2495                self.discovered(&query_id, &source, closer_peers.iter());
2496            }
2497
2498            HandlerEvent::PutRecord { record, request_id } => {
2499                self.record_received(source, connection, request_id, record);
2500            }
2501
2502            HandlerEvent::PutRecordRes { query_id, .. } => {
2503                if let Some(query) = self.queries.get_mut(&query_id) {
2504                    query.on_success(&source, vec![]);
2505                    if let QueryInfo::PutRecord {
2506                        phase: PutRecordPhase::PutRecord { success, .. },
2507                        quorum,
2508                        ..
2509                    } = &mut query.info
2510                    {
2511                        success.push(source);
2512
2513                        let quorum = quorum.get();
2514                        if success.len() >= quorum {
2515                            let peers = success.clone();
2516                            let finished = query.try_finish(peers.iter());
2517                            if !finished {
2518                                tracing::debug!(
2519                                    peer=%source,
2520                                    query=?query_id,
2521                                    "PutRecord query reached quorum ({}/{}) with response \
2522                                     from peer but could not yet finish.",
2523                                    peers.len(),
2524                                    quorum,
2525                                );
2526                            }
2527                        }
2528                    }
2529                }
2530            }
2531        };
2532    }
2533
2534    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
2535    fn poll(
2536        &mut self,
2537        cx: &mut Context<'_>,
2538    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2539        let now = Instant::now();
2540
2541        // Calculate the available capacity for queries triggered by background jobs.
2542        let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
2543
2544        // Run the periodic provider announcement job.
2545        if let Some(mut job) = self.add_provider_job.take() {
2546            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2547            for i in 0..num {
2548                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2549                    self.start_add_provider(r.key, AddProviderContext::Republish)
2550                } else {
2551                    jobs_query_capacity -= i;
2552                    break;
2553                }
2554            }
2555            self.add_provider_job = Some(job);
2556        }
2557
2558        // Run the periodic record replication / publication job.
2559        if let Some(mut job) = self.put_record_job.take() {
2560            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2561            for _ in 0..num {
2562                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2563                    let context =
2564                        if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
2565                            PutRecordContext::Republish
2566                        } else {
2567                            PutRecordContext::Replicate
2568                        };
2569                    self.start_put_record(r, Quorum::All, context)
2570                } else {
2571                    break;
2572                }
2573            }
2574            self.put_record_job = Some(job);
2575        }
2576
2577        // Poll bootstrap periodically and automatically.
2578        if let Poll::Ready(()) = self.bootstrap_status.poll_next_bootstrap(cx) {
2579            if let Err(e) = self.bootstrap() {
2580                tracing::warn!("Failed to trigger bootstrap: {e}");
2581            }
2582        }
2583
2584        loop {
2585            // Drain queued events first.
2586            if let Some(event) = self.queued_events.pop_front() {
2587                return Poll::Ready(event);
2588            }
2589
2590            // Drain applied pending entries from the routing table.
2591            if let Some(entry) = self.kbuckets.take_applied_pending() {
2592                let kbucket::Node { key, value } = entry.inserted;
2593                let peer_id = key.into_preimage();
2594                self.queued_events
2595                    .push_back(ToSwarm::NewExternalAddrOfPeer {
2596                        peer_id,
2597                        address: value.first().clone(),
2598                    });
2599                let event = Event::RoutingUpdated {
2600                    bucket_range: self
2601                        .kbuckets
2602                        .bucket(&key)
2603                        .map(|b| b.range())
2604                        .expect("Self to never be applied from pending."),
2605                    peer: peer_id,
2606                    is_new_peer: true,
2607                    addresses: value,
2608                    old_peer: entry.evicted.map(|n| n.key.into_preimage()),
2609                };
2610                return Poll::Ready(ToSwarm::GenerateEvent(event));
2611            }
2612
2613            // Look for a finished query.
2614            loop {
2615                match self.queries.poll(now) {
2616                    QueryPoolState::Finished(q) => {
2617                        if let Some(event) = self.query_finished(q) {
2618                            return Poll::Ready(ToSwarm::GenerateEvent(event));
2619                        }
2620                    }
2621                    QueryPoolState::Timeout(q) => {
2622                        if let Some(event) = self.query_timeout(q) {
2623                            return Poll::Ready(ToSwarm::GenerateEvent(event));
2624                        }
2625                    }
2626                    QueryPoolState::Waiting(Some((query, peer_id))) => {
2627                        let event = query.info.to_request(query.id());
2628                        // TODO: AddProvider requests yield no response, so the query completes
2629                        // as soon as all requests have been sent. However, the handler should
2630                        // better emit an event when the request has been sent (and report
2631                        // an error if sending fails), instead of immediately reporting
2632                        // "success" somewhat prematurely here.
2633                        if let QueryInfo::AddProvider {
2634                            phase: AddProviderPhase::AddProvider { .. },
2635                            ..
2636                        } = &query.info
2637                        {
2638                            query.on_success(&peer_id, vec![])
2639                        }
2640
2641                        if self.connected_peers.contains(&peer_id) {
2642                            self.queued_events.push_back(ToSwarm::NotifyHandler {
2643                                peer_id,
2644                                event,
2645                                handler: NotifyHandler::Any,
2646                            });
2647                        } else if &peer_id != self.kbuckets.local_key().preimage() {
2648                            query.pending_rpcs.push((peer_id, event));
2649                            self.queued_events.push_back(ToSwarm::Dial {
2650                                opts: DialOpts::peer_id(peer_id).build(),
2651                            });
2652                        }
2653                    }
2654                    QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
2655                }
2656            }
2657
2658            // No immediate event was produced as a result of a finished query.
2659            // If no new events have been queued either, signal `NotReady` to
2660            // be polled again later.
2661            if self.queued_events.is_empty() {
2662                self.no_events_waker = Some(cx.waker().clone());
2663
2664                return Poll::Pending;
2665            }
2666        }
2667    }
2668
2669    fn on_swarm_event(&mut self, event: FromSwarm) {
2670        self.listen_addresses.on_swarm_event(&event);
2671        let external_addresses_changed = self.external_addresses.on_swarm_event(&event);
2672
2673        if self.auto_mode && external_addresses_changed {
2674            self.determine_mode_from_external_addresses();
2675        }
2676
2677        match event {
2678            FromSwarm::ConnectionEstablished(connection_established) => {
2679                self.on_connection_established(connection_established)
2680            }
2681            FromSwarm::ConnectionClosed(connection_closed) => {
2682                self.on_connection_closed(connection_closed)
2683            }
2684            FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
2685            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
2686            FromSwarm::NewListenAddr(_) if self.connected_peers.is_empty() => {
2687                // A new listen addr was just discovered and we have no connected peers,
2688                // it can mean that our network interfaces were not up but they are now
2689                // so it might be a good idea to trigger a bootstrap.
2690                self.bootstrap_status.trigger();
2691            }
2692            _ => {}
2693        }
2694    }
2695}
2696
2697/// Peer Info combines a Peer ID with a set of multiaddrs that the peer is listening on.
2698#[derive(Debug, Clone, PartialEq, Eq)]
2699pub struct PeerInfo {
2700    pub peer_id: PeerId,
2701    pub addrs: Vec<Multiaddr>,
2702}
2703
2704/// A quorum w.r.t. the configured replication factor specifies the minimum
2705/// number of distinct nodes that must be successfully contacted in order
2706/// for a query to succeed.
2707#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2708pub enum Quorum {
2709    One,
2710    Majority,
2711    All,
2712    N(NonZeroUsize),
2713}
2714
2715impl Quorum {
2716    /// Evaluate the quorum w.r.t a given total (number of peers).
2717    fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
2718        match self {
2719            Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
2720            Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
2721            Quorum::All => total,
2722            Quorum::N(n) => NonZeroUsize::min(total, *n),
2723        }
2724    }
2725}
2726
2727/// A record either received by the given peer or retrieved from the local
2728/// record store.
2729#[derive(Debug, Clone, PartialEq, Eq)]
2730pub struct PeerRecord {
2731    /// The peer from whom the record was received. `None` if the record was
2732    /// retrieved from local storage.
2733    pub peer: Option<PeerId>,
2734    pub record: Record,
2735}
2736
2737//////////////////////////////////////////////////////////////////////////////
2738// Events
2739
2740/// The events produced by the `Kademlia` behaviour.
2741///
2742/// See [`NetworkBehaviour::poll`].
2743#[derive(Debug, Clone)]
2744#[allow(clippy::large_enum_variant)]
2745pub enum Event {
2746    /// An inbound request has been received and handled.
2747    // Note on the difference between 'request' and 'query': A request is a
2748    // single request-response style exchange with a single remote peer. A query
2749    // is made of multiple requests across multiple remote peers.
2750    InboundRequest { request: InboundRequest },
2751
2752    /// An outbound query has made progress.
2753    OutboundQueryProgressed {
2754        /// The ID of the query that finished.
2755        id: QueryId,
2756        /// The intermediate result of the query.
2757        result: QueryResult,
2758        /// Execution statistics from the query.
2759        stats: QueryStats,
2760        /// Indicates which event this is, if therer are multiple responses for a single query.
2761        step: ProgressStep,
2762    },
2763
2764    /// The routing table has been updated with a new peer and / or
2765    /// address, thereby possibly evicting another peer.
2766    RoutingUpdated {
2767        /// The ID of the peer that was added or updated.
2768        peer: PeerId,
2769        /// Whether this is a new peer and was thus just added to the routing
2770        /// table, or whether it is an existing peer who's addresses changed.
2771        is_new_peer: bool,
2772        /// The full list of known addresses of `peer`.
2773        addresses: Addresses,
2774        /// Returns the minimum inclusive and maximum inclusive distance for
2775        /// the bucket of the peer.
2776        bucket_range: (Distance, Distance),
2777        /// The ID of the peer that was evicted from the routing table to make
2778        /// room for the new peer, if any.
2779        old_peer: Option<PeerId>,
2780    },
2781
2782    /// A peer has connected for whom no listen address is known.
2783    ///
2784    /// If the peer is to be added to the routing table, a known
2785    /// listen address for the peer must be provided via [`Behaviour::add_address`].
2786    UnroutablePeer { peer: PeerId },
2787
2788    /// A connection to a peer has been established for whom a listen address
2789    /// is known but the peer has not been added to the routing table either
2790    /// because [`BucketInserts::Manual`] is configured or because
2791    /// the corresponding bucket is full.
2792    ///
2793    /// If the peer is to be included in the routing table, it must
2794    /// must be explicitly added via [`Behaviour::add_address`], possibly after
2795    /// removing another peer.
2796    ///
2797    /// See [`Behaviour::kbucket`] for insight into the contents of
2798    /// the k-bucket of `peer`.
2799    RoutablePeer { peer: PeerId, address: Multiaddr },
2800
2801    /// A connection to a peer has been established for whom a listen address
2802    /// is known but the peer is only pending insertion into the routing table
2803    /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2804    /// may not make it into the routing table.
2805    ///
2806    /// If the peer is to be unconditionally included in the routing table,
2807    /// it should be explicitly added via [`Behaviour::add_address`] after
2808    /// removing another peer.
2809    ///
2810    /// See [`Behaviour::kbucket`] for insight into the contents of
2811    /// the k-bucket of `peer`.
2812    PendingRoutablePeer { peer: PeerId, address: Multiaddr },
2813
2814    /// This peer's mode has been updated automatically.
2815    ///
2816    /// This happens in response to an external
2817    /// address being added or removed.
2818    ModeChanged { new_mode: Mode },
2819}
2820
2821/// Information about progress events.
2822#[derive(Debug, Clone)]
2823pub struct ProgressStep {
2824    /// The index into the event
2825    pub count: NonZeroUsize,
2826    /// Is this the final event?
2827    pub last: bool,
2828}
2829
2830impl ProgressStep {
2831    fn first() -> Self {
2832        Self {
2833            count: NonZeroUsize::new(1).expect("1 to be greater than 0."),
2834            last: false,
2835        }
2836    }
2837
2838    fn first_and_last() -> Self {
2839        let mut first = ProgressStep::first();
2840        first.last = true;
2841        first
2842    }
2843
2844    fn next(&self) -> Self {
2845        assert!(!self.last);
2846        let count = NonZeroUsize::new(self.count.get() + 1).expect("Adding 1 not to result in 0.");
2847
2848        Self { count, last: false }
2849    }
2850}
2851
2852/// Information about a received and handled inbound request.
2853#[derive(Debug, Clone)]
2854pub enum InboundRequest {
2855    /// Request for the list of nodes whose IDs are the closest to `key`.
2856    FindNode { num_closer_peers: usize },
2857    /// Same as `FindNode`, but should also return the entries of the local
2858    /// providers list for this key.
2859    GetProvider {
2860        num_closer_peers: usize,
2861        num_provider_peers: usize,
2862    },
2863    /// A peer sent an add provider request.
2864    /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is
2865    /// included.
2866    ///
2867    /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details..
2868    AddProvider { record: Option<ProviderRecord> },
2869    /// Request to retrieve a record.
2870    GetRecord {
2871        num_closer_peers: usize,
2872        present_locally: bool,
2873    },
2874    /// A peer sent a put record request.
2875    /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`Record`] is included.
2876    ///
2877    /// See [`StoreInserts`] and [`Config::set_record_filtering`].
2878    PutRecord {
2879        source: PeerId,
2880        connection: ConnectionId,
2881        record: Option<Record>,
2882    },
2883}
2884
2885/// The results of Kademlia queries.
2886#[derive(Debug, Clone)]
2887pub enum QueryResult {
2888    /// The result of [`Behaviour::bootstrap`].
2889    Bootstrap(BootstrapResult),
2890
2891    /// The result of [`Behaviour::get_closest_peers`].
2892    GetClosestPeers(GetClosestPeersResult),
2893
2894    /// The result of [`Behaviour::get_providers`].
2895    GetProviders(GetProvidersResult),
2896
2897    /// The result of [`Behaviour::start_providing`].
2898    StartProviding(AddProviderResult),
2899
2900    /// The result of a (automatic) republishing of a provider record.
2901    RepublishProvider(AddProviderResult),
2902
2903    /// The result of [`Behaviour::get_record`].
2904    GetRecord(GetRecordResult),
2905
2906    /// The result of [`Behaviour::put_record`].
2907    PutRecord(PutRecordResult),
2908
2909    /// The result of a (automatic) republishing of a (value-)record.
2910    RepublishRecord(PutRecordResult),
2911}
2912
2913/// The result of [`Behaviour::get_record`].
2914pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2915
2916/// The successful result of [`Behaviour::get_record`].
2917#[derive(Debug, Clone)]
2918pub enum GetRecordOk {
2919    FoundRecord(PeerRecord),
2920    FinishedWithNoAdditionalRecord {
2921        /// If caching is enabled, these are the peers closest
2922        /// _to the record key_ (not the local node) that were queried but
2923        /// did not return the record, sorted by distance to the record key
2924        /// from closest to farthest. How many of these are tracked is configured
2925        /// by [`Config::set_caching`].
2926        ///
2927        /// Writing back the cache at these peers is a manual operation.
2928        /// ie. you may wish to use these candidates with [`Behaviour::put_record_to`]
2929        /// after selecting one of the returned records.
2930        cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
2931    },
2932}
2933
2934/// The error result of [`Behaviour::get_record`].
2935#[derive(Debug, Clone, Error)]
2936pub enum GetRecordError {
2937    #[error("the record was not found")]
2938    NotFound {
2939        key: record::Key,
2940        closest_peers: Vec<PeerId>,
2941    },
2942    #[error("the quorum failed; needed {quorum} peers")]
2943    QuorumFailed {
2944        key: record::Key,
2945        records: Vec<PeerRecord>,
2946        quorum: NonZeroUsize,
2947    },
2948    #[error("the request timed out")]
2949    Timeout { key: record::Key },
2950}
2951
2952impl GetRecordError {
2953    /// Gets the key of the record for which the operation failed.
2954    pub fn key(&self) -> &record::Key {
2955        match self {
2956            GetRecordError::QuorumFailed { key, .. } => key,
2957            GetRecordError::Timeout { key, .. } => key,
2958            GetRecordError::NotFound { key, .. } => key,
2959        }
2960    }
2961
2962    /// Extracts the key of the record for which the operation failed,
2963    /// consuming the error.
2964    pub fn into_key(self) -> record::Key {
2965        match self {
2966            GetRecordError::QuorumFailed { key, .. } => key,
2967            GetRecordError::Timeout { key, .. } => key,
2968            GetRecordError::NotFound { key, .. } => key,
2969        }
2970    }
2971}
2972
2973/// The result of [`Behaviour::put_record`].
2974pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2975
2976/// The successful result of [`Behaviour::put_record`].
2977#[derive(Debug, Clone)]
2978pub struct PutRecordOk {
2979    pub key: record::Key,
2980}
2981
2982/// The error result of [`Behaviour::put_record`].
2983#[derive(Debug, Clone, Error)]
2984pub enum PutRecordError {
2985    #[error("the quorum failed; needed {quorum} peers")]
2986    QuorumFailed {
2987        key: record::Key,
2988        /// [`PeerId`]s of the peers the record was successfully stored on.
2989        success: Vec<PeerId>,
2990        quorum: NonZeroUsize,
2991    },
2992    #[error("the request timed out")]
2993    Timeout {
2994        key: record::Key,
2995        /// [`PeerId`]s of the peers the record was successfully stored on.
2996        success: Vec<PeerId>,
2997        quorum: NonZeroUsize,
2998    },
2999}
3000
3001impl PutRecordError {
3002    /// Gets the key of the record for which the operation failed.
3003    pub fn key(&self) -> &record::Key {
3004        match self {
3005            PutRecordError::QuorumFailed { key, .. } => key,
3006            PutRecordError::Timeout { key, .. } => key,
3007        }
3008    }
3009
3010    /// Extracts the key of the record for which the operation failed,
3011    /// consuming the error.
3012    pub fn into_key(self) -> record::Key {
3013        match self {
3014            PutRecordError::QuorumFailed { key, .. } => key,
3015            PutRecordError::Timeout { key, .. } => key,
3016        }
3017    }
3018}
3019
3020/// The result of [`Behaviour::bootstrap`].
3021pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
3022
3023/// The successful result of [`Behaviour::bootstrap`].
3024#[derive(Debug, Clone)]
3025pub struct BootstrapOk {
3026    pub peer: PeerId,
3027    pub num_remaining: u32,
3028}
3029
3030/// The error result of [`Behaviour::bootstrap`].
3031#[derive(Debug, Clone, Error)]
3032pub enum BootstrapError {
3033    #[error("the request timed out")]
3034    Timeout {
3035        peer: PeerId,
3036        num_remaining: Option<u32>,
3037    },
3038}
3039
3040/// The result of [`Behaviour::get_closest_peers`].
3041pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
3042
3043/// The successful result of [`Behaviour::get_closest_peers`].
3044#[derive(Debug, Clone)]
3045pub struct GetClosestPeersOk {
3046    pub key: Vec<u8>,
3047    pub peers: Vec<PeerInfo>,
3048}
3049
3050/// The error result of [`Behaviour::get_closest_peers`].
3051#[derive(Debug, Clone, Error)]
3052pub enum GetClosestPeersError {
3053    #[error("the request timed out")]
3054    Timeout { key: Vec<u8>, peers: Vec<PeerInfo> },
3055}
3056
3057impl GetClosestPeersError {
3058    /// Gets the key for which the operation failed.
3059    pub fn key(&self) -> &Vec<u8> {
3060        match self {
3061            GetClosestPeersError::Timeout { key, .. } => key,
3062        }
3063    }
3064
3065    /// Extracts the key for which the operation failed,
3066    /// consuming the error.
3067    pub fn into_key(self) -> Vec<u8> {
3068        match self {
3069            GetClosestPeersError::Timeout { key, .. } => key,
3070        }
3071    }
3072}
3073
3074/// The result of [`Behaviour::get_providers`].
3075pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
3076
3077/// The successful result of [`Behaviour::get_providers`].
3078#[derive(Debug, Clone)]
3079pub enum GetProvidersOk {
3080    FoundProviders {
3081        key: record::Key,
3082        /// The new set of providers discovered.
3083        providers: HashSet<PeerId>,
3084    },
3085    FinishedWithNoAdditionalRecord {
3086        closest_peers: Vec<PeerId>,
3087    },
3088}
3089
3090/// The error result of [`Behaviour::get_providers`].
3091#[derive(Debug, Clone, Error)]
3092pub enum GetProvidersError {
3093    #[error("the request timed out")]
3094    Timeout {
3095        key: record::Key,
3096        closest_peers: Vec<PeerId>,
3097    },
3098}
3099
3100impl GetProvidersError {
3101    /// Gets the key for which the operation failed.
3102    pub fn key(&self) -> &record::Key {
3103        match self {
3104            GetProvidersError::Timeout { key, .. } => key,
3105        }
3106    }
3107
3108    /// Extracts the key for which the operation failed,
3109    /// consuming the error.
3110    pub fn into_key(self) -> record::Key {
3111        match self {
3112            GetProvidersError::Timeout { key, .. } => key,
3113        }
3114    }
3115}
3116
3117/// The result of publishing a provider record.
3118pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
3119
3120/// The successful result of publishing a provider record.
3121#[derive(Debug, Clone)]
3122pub struct AddProviderOk {
3123    pub key: record::Key,
3124}
3125
3126/// The possible errors when publishing a provider record.
3127#[derive(Debug, Clone, Error)]
3128pub enum AddProviderError {
3129    #[error("the request timed out")]
3130    Timeout { key: record::Key },
3131}
3132
3133impl AddProviderError {
3134    /// Gets the key for which the operation failed.
3135    pub fn key(&self) -> &record::Key {
3136        match self {
3137            AddProviderError::Timeout { key, .. } => key,
3138        }
3139    }
3140
3141    /// Extracts the key for which the operation failed,
3142    pub fn into_key(self) -> record::Key {
3143        match self {
3144            AddProviderError::Timeout { key, .. } => key,
3145        }
3146    }
3147}
3148
3149impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
3150    fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
3151        KadPeer {
3152            node_id: e.node.key.into_preimage(),
3153            multiaddrs: e.node.value.into_vec(),
3154            connection_ty: match e.status {
3155                NodeStatus::Connected => ConnectionType::Connected,
3156                NodeStatus::Disconnected => ConnectionType::NotConnected,
3157            },
3158        }
3159    }
3160}
3161
3162/// The context of a [`QueryInfo::AddProvider`] query.
3163#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3164pub enum AddProviderContext {
3165    /// The context is a [`Behaviour::start_providing`] operation.
3166    Publish,
3167    /// The context is periodic republishing of provider announcements
3168    /// initiated earlier via [`Behaviour::start_providing`].
3169    Republish,
3170}
3171
3172/// The context of a [`QueryInfo::PutRecord`] query.
3173#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3174pub enum PutRecordContext {
3175    /// The context is a [`Behaviour::put_record`] operation.
3176    Publish,
3177    /// The context is periodic republishing of records stored
3178    /// earlier via [`Behaviour::put_record`].
3179    Republish,
3180    /// The context is periodic replication (i.e. without extending
3181    /// the record TTL) of stored records received earlier from another peer.
3182    Replicate,
3183    /// The context is a custom store operation targeting specific
3184    /// peers initiated by [`Behaviour::put_record_to`].
3185    Custom,
3186}
3187
3188/// Information about a running query.
3189#[derive(Debug, Clone)]
3190pub enum QueryInfo {
3191    /// A query initiated by [`Behaviour::bootstrap`].
3192    Bootstrap {
3193        /// The targeted peer ID.
3194        peer: PeerId,
3195        /// The remaining random peer IDs to query, one per
3196        /// bucket that still needs refreshing.
3197        ///
3198        /// This is `None` if the initial self-lookup has not
3199        /// yet completed and `Some` with an exhausted iterator
3200        /// if bootstrapping is complete.
3201        remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>,
3202        step: ProgressStep,
3203    },
3204
3205    /// A (repeated) query initiated by [`Behaviour::get_closest_peers`].
3206    GetClosestPeers {
3207        /// The key being queried (the preimage).
3208        key: Vec<u8>,
3209        /// Current index of events.
3210        step: ProgressStep,
3211        /// If required, `num_results` specifies expected responding peers
3212        num_results: Option<NonZeroUsize>,
3213    },
3214
3215    /// A (repeated) query initiated by [`Behaviour::get_providers`].
3216    GetProviders {
3217        /// The key for which to search for providers.
3218        key: record::Key,
3219        /// The number of providers found so far.
3220        providers_found: usize,
3221        /// Current index of events.
3222        step: ProgressStep,
3223    },
3224
3225    /// A (repeated) query initiated by [`Behaviour::start_providing`].
3226    AddProvider {
3227        /// The record key.
3228        key: record::Key,
3229        /// The current phase of the query.
3230        phase: AddProviderPhase,
3231        /// The execution context of the query.
3232        context: AddProviderContext,
3233    },
3234
3235    /// A (repeated) query initiated by [`Behaviour::put_record`].
3236    PutRecord {
3237        record: Record,
3238        /// The expected quorum of responses w.r.t. the replication factor.
3239        quorum: NonZeroUsize,
3240        /// The current phase of the query.
3241        phase: PutRecordPhase,
3242        /// The execution context of the query.
3243        context: PutRecordContext,
3244    },
3245
3246    /// A (repeated) query initiated by [`Behaviour::get_record`].
3247    GetRecord {
3248        /// The key to look for.
3249        key: record::Key,
3250        /// Current index of events.
3251        step: ProgressStep,
3252        /// Did we find at least one record?
3253        found_a_record: bool,
3254        /// The peers closest to the `key` that were queried but did not return a record,
3255        /// i.e. the peers that are candidates for caching the record.
3256        cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
3257    },
3258}
3259
3260impl QueryInfo {
3261    /// Creates an event for a handler to issue an outgoing request in the
3262    /// context of a query.
3263    fn to_request(&self, query_id: QueryId) -> HandlerIn {
3264        match &self {
3265            QueryInfo::Bootstrap { peer, .. } => HandlerIn::FindNodeReq {
3266                key: peer.to_bytes(),
3267                query_id,
3268            },
3269            QueryInfo::GetClosestPeers { key, .. } => HandlerIn::FindNodeReq {
3270                key: key.clone(),
3271                query_id,
3272            },
3273            QueryInfo::GetProviders { key, .. } => HandlerIn::GetProvidersReq {
3274                key: key.clone(),
3275                query_id,
3276            },
3277            QueryInfo::AddProvider { key, phase, .. } => match phase {
3278                AddProviderPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3279                    key: key.to_vec(),
3280                    query_id,
3281                },
3282                AddProviderPhase::AddProvider {
3283                    provider_id,
3284                    external_addresses,
3285                    ..
3286                } => HandlerIn::AddProvider {
3287                    key: key.clone(),
3288                    provider: crate::protocol::KadPeer {
3289                        node_id: *provider_id,
3290                        multiaddrs: external_addresses.clone(),
3291                        connection_ty: crate::protocol::ConnectionType::Connected,
3292                    },
3293                    query_id,
3294                },
3295            },
3296            QueryInfo::GetRecord { key, .. } => HandlerIn::GetRecord {
3297                key: key.clone(),
3298                query_id,
3299            },
3300            QueryInfo::PutRecord { record, phase, .. } => match phase {
3301                PutRecordPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3302                    key: record.key.to_vec(),
3303                    query_id,
3304                },
3305                PutRecordPhase::PutRecord { .. } => HandlerIn::PutRecord {
3306                    record: record.clone(),
3307                    query_id,
3308                },
3309            },
3310        }
3311    }
3312}
3313
3314/// The phases of a [`QueryInfo::AddProvider`] query.
3315#[derive(Debug, Clone)]
3316pub enum AddProviderPhase {
3317    /// The query is searching for the closest nodes to the record key.
3318    GetClosestPeers,
3319
3320    /// The query advertises the local node as a provider for the key to
3321    /// the closest nodes to the key.
3322    AddProvider {
3323        /// The local peer ID that is advertised as a provider.
3324        provider_id: PeerId,
3325        /// The external addresses of the provider being advertised.
3326        external_addresses: Vec<Multiaddr>,
3327        /// Query statistics from the finished `GetClosestPeers` phase.
3328        get_closest_peers_stats: QueryStats,
3329    },
3330}
3331
3332/// The phases of a [`QueryInfo::PutRecord`] query.
3333#[derive(Debug, Clone, PartialEq, Eq)]
3334pub enum PutRecordPhase {
3335    /// The query is searching for the closest nodes to the record key.
3336    GetClosestPeers,
3337
3338    /// The query is replicating the record to the closest nodes to the key.
3339    PutRecord {
3340        /// A list of peers the given record has been successfully replicated to.
3341        success: Vec<PeerId>,
3342        /// Query statistics from the finished `GetClosestPeers` phase.
3343        get_closest_peers_stats: QueryStats,
3344    },
3345}
3346
3347/// A mutable reference to a running query.
3348pub struct QueryMut<'a> {
3349    query: &'a mut Query,
3350}
3351
3352impl QueryMut<'_> {
3353    pub fn id(&self) -> QueryId {
3354        self.query.id()
3355    }
3356
3357    /// Gets information about the type and state of the query.
3358    pub fn info(&self) -> &QueryInfo {
3359        &self.query.info
3360    }
3361
3362    /// Gets execution statistics about the query.
3363    ///
3364    /// For a multi-phase query such as `put_record`, these are the
3365    /// statistics of the current phase.
3366    pub fn stats(&self) -> &QueryStats {
3367        self.query.stats()
3368    }
3369
3370    /// Finishes the query asap, without waiting for the
3371    /// regular termination conditions.
3372    pub fn finish(&mut self) {
3373        self.query.finish()
3374    }
3375}
3376
3377/// An immutable reference to a running query.
3378pub struct QueryRef<'a> {
3379    query: &'a Query,
3380}
3381
3382impl QueryRef<'_> {
3383    pub fn id(&self) -> QueryId {
3384        self.query.id()
3385    }
3386
3387    /// Gets information about the type and state of the query.
3388    pub fn info(&self) -> &QueryInfo {
3389        &self.query.info
3390    }
3391
3392    /// Gets execution statistics about the query.
3393    ///
3394    /// For a multi-phase query such as `put_record`, these are the
3395    /// statistics of the current phase.
3396    pub fn stats(&self) -> &QueryStats {
3397        self.query.stats()
3398    }
3399}
3400
3401/// An operation failed to due no known peers in the routing table.
3402#[derive(Debug, Clone)]
3403pub struct NoKnownPeers();
3404
3405impl fmt::Display for NoKnownPeers {
3406    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3407        write!(f, "No known peers.")
3408    }
3409}
3410
3411impl std::error::Error for NoKnownPeers {}
3412
3413/// The possible outcomes of [`Behaviour::add_address`].
3414#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3415pub enum RoutingUpdate {
3416    /// The given peer and address has been added to the routing
3417    /// table.
3418    Success,
3419    /// The peer and address is pending insertion into
3420    /// the routing table, if a disconnected peer fails
3421    /// to respond. If the given peer and address ends up
3422    /// in the routing table, [`Event::RoutingUpdated`]
3423    /// is eventually emitted.
3424    Pending,
3425    /// The routing table update failed, either because the
3426    /// corresponding bucket for the peer is full and the
3427    /// pending slot(s) are occupied, or because the given
3428    /// peer ID is deemed invalid (e.g. refers to the local
3429    /// peer ID).
3430    Failed,
3431}
3432
3433#[derive(PartialEq, Copy, Clone, Debug)]
3434pub enum Mode {
3435    Client,
3436    Server,
3437}
3438
3439impl fmt::Display for Mode {
3440    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3441        match self {
3442            Mode::Client => write!(f, "client"),
3443            Mode::Server => write!(f, "server"),
3444        }
3445    }
3446}
3447
3448fn to_comma_separated_list<T>(confirmed_external_addresses: &[T]) -> String
3449where
3450    T: ToString,
3451{
3452    confirmed_external_addresses
3453        .iter()
3454        .map(|addr| addr.to_string())
3455        .collect::<Vec<_>>()
3456        .join(", ")
3457}