libp2p_kad/
query.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21mod peers;
22
23use std::{num::NonZeroUsize, time::Duration};
24
25use either::Either;
26use fnv::FnvHashMap;
27use libp2p_core::Multiaddr;
28use libp2p_identity::PeerId;
29use peers::{
30    closest::{disjoint::ClosestDisjointPeersIter, ClosestPeersIter, ClosestPeersIterConfig},
31    fixed::FixedPeersIter,
32    PeersIterState,
33};
34use smallvec::SmallVec;
35use web_time::Instant;
36
37use crate::{
38    behaviour::PeerInfo,
39    handler::HandlerIn,
40    kbucket::{Key, KeyBytes},
41    QueryInfo, ALPHA_VALUE, K_VALUE,
42};
43
44/// A `QueryPool` provides an aggregate state machine for driving `Query`s to completion.
45///
46/// Internally, a `Query` is in turn driven by an underlying `QueryPeerIter`
47/// that determines the peer selection strategy, i.e. the order in which the
48/// peers involved in the query should be contacted.
49pub(crate) struct QueryPool {
50    next_id: usize,
51    config: QueryConfig,
52    queries: FnvHashMap<QueryId, Query>,
53}
54
55/// The observable states emitted by [`QueryPool::poll`].
56pub(crate) enum QueryPoolState<'a> {
57    /// The pool is idle, i.e. there are no queries to process.
58    Idle,
59    /// At least one query is waiting for results. `Some(request)` indicates
60    /// that a new request is now being waited on.
61    Waiting(Option<(&'a mut Query, PeerId)>),
62    /// A query has finished.
63    Finished(Query),
64    /// A query has timed out.
65    Timeout(Query),
66}
67
68impl QueryPool {
69    /// Creates a new `QueryPool` with the given configuration.
70    pub(crate) fn new(config: QueryConfig) -> Self {
71        QueryPool {
72            next_id: 0,
73            config,
74            queries: Default::default(),
75        }
76    }
77
78    /// Gets a reference to the `QueryConfig` used by the pool.
79    pub(crate) fn config(&self) -> &QueryConfig {
80        &self.config
81    }
82
83    /// Returns an iterator over the queries in the pool.
84    pub(crate) fn iter(&self) -> impl Iterator<Item = &Query> {
85        self.queries.values()
86    }
87
88    /// Gets the current size of the pool, i.e. the number of running queries.
89    pub(crate) fn size(&self) -> usize {
90        self.queries.len()
91    }
92
93    /// Returns an iterator that allows modifying each query in the pool.
94    pub(crate) fn iter_mut(&mut self) -> impl Iterator<Item = &mut Query> {
95        self.queries.values_mut()
96    }
97
98    /// Adds a query to the pool that contacts a fixed set of peers.
99    pub(crate) fn add_fixed<I>(&mut self, peers: I, info: QueryInfo) -> QueryId
100    where
101        I: IntoIterator<Item = PeerId>,
102    {
103        let id = self.next_query_id();
104        self.continue_fixed(id, peers, info);
105        id
106    }
107
108    /// Continues an earlier query with a fixed set of peers, reusing
109    /// the given query ID, which must be from a query that finished
110    /// earlier.
111    pub(crate) fn continue_fixed<I>(&mut self, id: QueryId, peers: I, info: QueryInfo)
112    where
113        I: IntoIterator<Item = PeerId>,
114    {
115        assert!(!self.queries.contains_key(&id));
116        let parallelism = self.config.replication_factor;
117        let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism));
118        let query = Query::new(id, peer_iter, info);
119        self.queries.insert(id, query);
120    }
121
122    /// Adds a query to the pool that iterates towards the closest peers to the target.
123    pub(crate) fn add_iter_closest<T, I>(&mut self, target: T, peers: I, info: QueryInfo) -> QueryId
124    where
125        T: Into<KeyBytes> + Clone,
126        I: IntoIterator<Item = Key<PeerId>>,
127    {
128        let id = self.next_query_id();
129        self.continue_iter_closest(id, target, peers, info);
130        id
131    }
132
133    /// Adds a query to the pool that iterates towards the closest peers to the target.
134    pub(crate) fn continue_iter_closest<T, I>(
135        &mut self,
136        id: QueryId,
137        target: T,
138        peers: I,
139        info: QueryInfo,
140    ) where
141        T: Into<KeyBytes> + Clone,
142        I: IntoIterator<Item = Key<PeerId>>,
143    {
144        let num_results = match info {
145            QueryInfo::GetClosestPeers {
146                num_results: Some(val),
147                ..
148            } => val,
149            _ => self.config.replication_factor,
150        };
151
152        let cfg = ClosestPeersIterConfig {
153            num_results,
154            parallelism: self.config.parallelism,
155            ..ClosestPeersIterConfig::default()
156        };
157
158        let peer_iter = if self.config.disjoint_query_paths {
159            QueryPeerIter::ClosestDisjoint(ClosestDisjointPeersIter::with_config(
160                cfg, target, peers,
161            ))
162        } else {
163            QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers))
164        };
165
166        let query = Query::new(id, peer_iter, info);
167        self.queries.insert(id, query);
168    }
169
170    fn next_query_id(&mut self) -> QueryId {
171        let id = QueryId(self.next_id);
172        self.next_id = self.next_id.wrapping_add(1);
173        id
174    }
175
176    /// Returns a reference to a query with the given ID, if it is in the pool.
177    pub(crate) fn get(&self, id: &QueryId) -> Option<&Query> {
178        self.queries.get(id)
179    }
180
181    /// Returns a mutablereference to a query with the given ID, if it is in the pool.
182    pub(crate) fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query> {
183        self.queries.get_mut(id)
184    }
185
186    /// Polls the pool to advance the queries.
187    pub(crate) fn poll(&mut self, now: Instant) -> QueryPoolState<'_> {
188        let mut finished = None;
189        let mut timeout = None;
190        let mut waiting = None;
191
192        for (&query_id, query) in self.queries.iter_mut() {
193            query.stats.start = query.stats.start.or(Some(now));
194            match query.next(now) {
195                PeersIterState::Finished => {
196                    finished = Some(query_id);
197                    break;
198                }
199                PeersIterState::Waiting(Some(peer_id)) => {
200                    let peer = peer_id.into_owned();
201                    waiting = Some((query_id, peer));
202                    break;
203                }
204                PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
205                    let elapsed = now - query.stats.start.unwrap_or(now);
206                    if elapsed >= self.config.timeout {
207                        timeout = Some(query_id);
208                        break;
209                    }
210                }
211            }
212        }
213
214        if let Some((query_id, peer_id)) = waiting {
215            let query = self.queries.get_mut(&query_id).expect("s.a.");
216            return QueryPoolState::Waiting(Some((query, peer_id)));
217        }
218
219        if let Some(query_id) = finished {
220            let mut query = self.queries.remove(&query_id).expect("s.a.");
221            query.stats.end = Some(now);
222            return QueryPoolState::Finished(query);
223        }
224
225        if let Some(query_id) = timeout {
226            let mut query = self.queries.remove(&query_id).expect("s.a.");
227            query.stats.end = Some(now);
228            return QueryPoolState::Timeout(query);
229        }
230
231        if self.queries.is_empty() {
232            QueryPoolState::Idle
233        } else {
234            QueryPoolState::Waiting(None)
235        }
236    }
237}
238
239/// Unique identifier for an active query.
240#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
241pub struct QueryId(usize);
242
243impl std::fmt::Display for QueryId {
244    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245        write!(f, "{}", self.0)
246    }
247}
248
249/// The configuration for queries in a `QueryPool`.
250#[derive(Debug, Clone)]
251pub(crate) struct QueryConfig {
252    /// Timeout of a single query.
253    ///
254    /// See [`crate::behaviour::Config::set_query_timeout`] for details.
255    pub(crate) timeout: Duration,
256    /// The replication factor to use.
257    ///
258    /// See [`crate::behaviour::Config::set_replication_factor`] for details.
259    pub(crate) replication_factor: NonZeroUsize,
260    /// Allowed level of parallelism for iterative queries.
261    ///
262    /// See [`crate::behaviour::Config::set_parallelism`] for details.
263    pub(crate) parallelism: NonZeroUsize,
264    /// Whether to use disjoint paths on iterative lookups.
265    ///
266    /// See [`crate::behaviour::Config::disjoint_query_paths`] for details.
267    pub(crate) disjoint_query_paths: bool,
268}
269
270impl Default for QueryConfig {
271    fn default() -> Self {
272        QueryConfig {
273            timeout: Duration::from_secs(60),
274            replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"),
275            parallelism: ALPHA_VALUE,
276            disjoint_query_paths: false,
277        }
278    }
279}
280
281/// A query in a `QueryPool`.
282pub(crate) struct Query {
283    /// The unique ID of the query.
284    id: QueryId,
285    /// The peer iterator that drives the query state.
286    pub(crate) peers: QueryPeers,
287    /// Execution statistics of the query.
288    pub(crate) stats: QueryStats,
289    /// The query-specific state.
290    pub(crate) info: QueryInfo,
291    /// A map of pending requests to peers.
292    ///
293    /// A request is pending if the targeted peer is not currently connected
294    /// and these requests are sent as soon as a connection to the peer is established.
295    pub(crate) pending_rpcs: SmallVec<[(PeerId, HandlerIn); K_VALUE.get()]>,
296}
297
298/// The peer iterator that drives the query state,
299pub(crate) struct QueryPeers {
300    /// Addresses of peers discovered during a query.
301    pub(crate) addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
302    /// The peer iterator that drives the query state.
303    peer_iter: QueryPeerIter,
304}
305
306impl QueryPeers {
307    /// Consumes the peers iterator, producing a final `Iterator` over the discovered `PeerId`s.
308    pub(crate) fn into_peerids_iter(self) -> impl Iterator<Item = PeerId> {
309        match self.peer_iter {
310            QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
311            QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
312            QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
313        }
314    }
315
316    /// Consumes the peers iterator, producing a final `Iterator` over the discovered `PeerId`s
317    /// with their matching `Multiaddr`s.
318    pub(crate) fn into_peerinfos_iter(mut self) -> impl Iterator<Item = PeerInfo> {
319        match self.peer_iter {
320            QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
321            QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
322            QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
323        }
324        .map(move |peer_id| {
325            let addrs = self.addresses.remove(&peer_id).unwrap_or_default().to_vec();
326            PeerInfo { peer_id, addrs }
327        })
328    }
329}
330
331/// The peer selection strategies that can be used by queries.
332enum QueryPeerIter {
333    Closest(ClosestPeersIter),
334    ClosestDisjoint(ClosestDisjointPeersIter),
335    Fixed(FixedPeersIter),
336}
337
338impl Query {
339    /// Creates a new query without starting it.
340    fn new(id: QueryId, peer_iter: QueryPeerIter, info: QueryInfo) -> Self {
341        Query {
342            id,
343            info,
344            peers: QueryPeers {
345                addresses: Default::default(),
346                peer_iter,
347            },
348            pending_rpcs: SmallVec::default(),
349            stats: QueryStats::empty(),
350        }
351    }
352
353    /// Gets the unique ID of the query.
354    pub(crate) fn id(&self) -> QueryId {
355        self.id
356    }
357
358    /// Gets the current execution statistics of the query.
359    pub(crate) fn stats(&self) -> &QueryStats {
360        &self.stats
361    }
362
363    /// Informs the query that the attempt to contact `peer` failed.
364    pub(crate) fn on_failure(&mut self, peer: &PeerId) {
365        let updated = match &mut self.peers.peer_iter {
366            QueryPeerIter::Closest(iter) => iter.on_failure(peer),
367            QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
368            QueryPeerIter::Fixed(iter) => iter.on_failure(peer),
369        };
370        if updated {
371            self.stats.failure += 1;
372        }
373    }
374
375    /// Informs the query that the attempt to contact `peer` succeeded,
376    /// possibly resulting in new peers that should be incorporated into
377    /// the query, if applicable.
378    pub(crate) fn on_success<I>(&mut self, peer: &PeerId, new_peers: I)
379    where
380        I: IntoIterator<Item = PeerId>,
381    {
382        let updated = match &mut self.peers.peer_iter {
383            QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers),
384            QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, new_peers),
385            QueryPeerIter::Fixed(iter) => iter.on_success(peer),
386        };
387        if updated {
388            self.stats.success += 1;
389        }
390    }
391
392    /// Advances the state of the underlying peer iterator.
393    fn next(&mut self, now: Instant) -> PeersIterState<'_> {
394        let state = match &mut self.peers.peer_iter {
395            QueryPeerIter::Closest(iter) => iter.next(now),
396            QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
397            QueryPeerIter::Fixed(iter) => iter.next(),
398        };
399
400        if let PeersIterState::Waiting(Some(_)) = state {
401            self.stats.requests += 1;
402        }
403
404        state
405    }
406
407    /// Tries to (gracefully) finish the query prematurely, providing the peers
408    /// that are no longer of interest for further progress of the query.
409    ///
410    /// A query may require that in order to finish gracefully a certain subset
411    /// of peers must be contacted. E.g. in the case of disjoint query paths a
412    /// query may only finish gracefully if every path contacted a peer whose
413    /// response permits termination of the query. The given peers are those for
414    /// which this is considered to be the case, i.e. for which a termination
415    /// condition is satisfied.
416    ///
417    /// Returns `true` if the query did indeed finish, `false` otherwise. In the
418    /// latter case, a new attempt at finishing the query may be made with new
419    /// `peers`.
420    ///
421    /// A finished query immediately stops yielding new peers to contact and
422    /// will be reported by [`QueryPool::poll`] via
423    /// [`QueryPoolState::Finished`].
424    pub(crate) fn try_finish<'a, I>(&mut self, peers: I) -> bool
425    where
426        I: IntoIterator<Item = &'a PeerId>,
427    {
428        match &mut self.peers.peer_iter {
429            QueryPeerIter::Closest(iter) => {
430                iter.finish();
431                true
432            }
433            QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers),
434            QueryPeerIter::Fixed(iter) => {
435                iter.finish();
436                true
437            }
438        }
439    }
440
441    /// Finishes the query prematurely.
442    ///
443    /// A finished query immediately stops yielding new peers to contact and will be
444    /// reported by [`QueryPool::poll`] via [`QueryPoolState::Finished`].
445    pub(crate) fn finish(&mut self) {
446        match &mut self.peers.peer_iter {
447            QueryPeerIter::Closest(iter) => iter.finish(),
448            QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
449            QueryPeerIter::Fixed(iter) => iter.finish(),
450        }
451    }
452
453    /// Checks whether the query has finished.
454    ///
455    /// A finished query is eventually reported by `QueryPool::next()` and
456    /// removed from the pool.
457    pub(crate) fn is_finished(&self) -> bool {
458        match &self.peers.peer_iter {
459            QueryPeerIter::Closest(iter) => iter.is_finished(),
460            QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
461            QueryPeerIter::Fixed(iter) => iter.is_finished(),
462        }
463    }
464}
465
466/// Execution statistics of a query.
467#[derive(Clone, Debug, PartialEq, Eq)]
468pub struct QueryStats {
469    requests: u32,
470    success: u32,
471    failure: u32,
472    start: Option<Instant>,
473    end: Option<Instant>,
474}
475
476impl QueryStats {
477    pub fn empty() -> Self {
478        QueryStats {
479            requests: 0,
480            success: 0,
481            failure: 0,
482            start: None,
483            end: None,
484        }
485    }
486
487    /// Gets the total number of requests initiated by the query.
488    pub fn num_requests(&self) -> u32 {
489        self.requests
490    }
491
492    /// Gets the number of successful requests.
493    pub fn num_successes(&self) -> u32 {
494        self.success
495    }
496
497    /// Gets the number of failed requests.
498    pub fn num_failures(&self) -> u32 {
499        self.failure
500    }
501
502    /// Gets the number of pending requests.
503    ///
504    /// > **Note**: A query can finish while still having pending
505    /// > requests, if the termination conditions are already met.
506    pub fn num_pending(&self) -> u32 {
507        self.requests - (self.success + self.failure)
508    }
509
510    /// Gets the duration of the query.
511    ///
512    /// If the query has not yet finished, the duration is measured from the
513    /// start of the query to the current instant.
514    ///
515    /// If the query did not yet start (i.e. yield the first peer to contact),
516    /// `None` is returned.
517    pub fn duration(&self) -> Option<Duration> {
518        if let Some(s) = self.start {
519            if let Some(e) = self.end {
520                Some(e - s)
521            } else {
522                Some(Instant::now() - s)
523            }
524        } else {
525            None
526        }
527    }
528
529    /// Merges these stats with the given stats of another query,
530    /// e.g. to accumulate statistics from a multi-phase query.
531    ///
532    /// Counters are merged cumulatively while the instants for
533    /// start and end of the queries are taken as the minimum and
534    /// maximum, respectively.
535    pub fn merge(self, other: QueryStats) -> Self {
536        QueryStats {
537            requests: self.requests + other.requests,
538            success: self.success + other.success,
539            failure: self.failure + other.failure,
540            start: match (self.start, other.start) {
541                (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
542                (a, b) => a.or(b),
543            },
544            end: std::cmp::max(self.end, other.end),
545        }
546    }
547}