libp2p_kad/
kbucket.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of a Kademlia routing table as used by a single peer
22//! participating in a Kademlia DHT.
23//!
24//! The entry point for the API of this module is a [`KBucketsTable`].
25//!
26//! ## Pending Insertions
27//!
28//! When the bucket associated with the `Key` of an inserted entry is full
29//! but contains disconnected nodes, it accepts a [`PendingEntry`].
30//! Pending entries are inserted lazily when their timeout is found to be expired
31//! upon querying the `KBucketsTable`. When that happens, the `KBucketsTable` records
32//! an [`AppliedPending`] result which must be consumed by calling [`take_applied_pending`]
33//! regularly and / or after performing lookup operations like [`entry`] and [`closest`].
34//!
35//! [`entry`]: KBucketsTable::entry
36//! [`closest`]: KBucketsTable::closest
37//! [`AppliedPending`]: bucket::AppliedPending
38//! [`take_applied_pending`]: KBucketsTable::take_applied_pending
39//! [`PendingEntry`]: entry::PendingEntry
40
41// [Implementation Notes]
42//
43// 1. Routing Table Layout
44//
45// The routing table is currently implemented as a fixed-size "array" of
46// buckets, ordered by increasing distance relative to a local key
47// that identifies the local peer. This is an often-used, simplified
48// implementation that approximates the properties of the b-tree (or prefix tree)
49// implementation described in the full paper [0], whereby buckets are split on-demand.
50// This should be treated as an implementation detail, however, so that the
51// implementation may change in the future without breaking the API.
52//
53// 2. Replacement Cache
54//
55// In this implementation, the "replacement cache" for unresponsive peers
56// consists of a single entry per bucket. Furthermore, this implementation is
57// currently tailored to connection-oriented transports, meaning that the
58// "LRU"-based ordering of entries in a bucket is actually based on the last reported
59// connection status of the corresponding peers, from least-recently (dis)connected to
60// most-recently (dis)connected, and controlled through the `Entry` API. As a result,
61// the nodes in the buckets are not reordered as a result of RPC activity, but only as a
62// result of nodes being marked as connected or disconnected. In particular,
63// if a bucket is full and contains only entries for peers that are considered
64// connected, no pending entry is accepted. See the `bucket` submodule for
65// further details.
66//
67// [0]: https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf
68
69mod bucket;
70mod entry;
71#[allow(clippy::ptr_offset_with_cast)]
72#[allow(clippy::assign_op_pattern)]
73mod key;
74
75use std::{collections::VecDeque, num::NonZeroUsize, time::Duration};
76
77use bucket::KBucket;
78pub use bucket::NodeStatus;
79pub use entry::*;
80use smallvec::SmallVec;
81use web_time::Instant;
82
83/// Maximum number of k-buckets.
84const NUM_BUCKETS: usize = 256;
85
86/// The configuration for `KBucketsTable`.
87#[derive(Debug, Clone, Copy)]
88pub(crate) struct KBucketConfig {
89    /// Maximal number of nodes that a bucket can contain.
90    bucket_size: usize,
91    /// Specifies the duration after creation of a [`PendingEntry`] after which
92    /// it becomes eligible for insertion into a full bucket, replacing the
93    /// least-recently (dis)connected node.
94    pending_timeout: Duration,
95}
96
97impl Default for KBucketConfig {
98    fn default() -> Self {
99        KBucketConfig {
100            bucket_size: K_VALUE.get(),
101            pending_timeout: Duration::from_secs(60),
102        }
103    }
104}
105
106impl KBucketConfig {
107    /// Modifies the maximal number of nodes that a bucket can contain.
108    pub(crate) fn set_bucket_size(&mut self, bucket_size: NonZeroUsize) {
109        self.bucket_size = bucket_size.get();
110    }
111
112    /// Modifies the duration after creation of a [`PendingEntry`] after which
113    /// it becomes eligible for insertion into a full bucket, replacing the
114    /// least-recently (dis)connected node.
115    pub(crate) fn set_pending_timeout(&mut self, pending_timeout: Duration) {
116        self.pending_timeout = pending_timeout;
117    }
118}
119
120/// A `KBucketsTable` represents a Kademlia routing table.
121#[derive(Debug, Clone)]
122pub(crate) struct KBucketsTable<TKey, TVal> {
123    /// The key identifying the local peer that owns the routing table.
124    local_key: TKey,
125    /// The buckets comprising the routing table.
126    buckets: Vec<KBucket<TKey, TVal>>,
127    /// The maximal number of nodes that a bucket can contain.
128    bucket_size: usize,
129    /// The list of evicted entries that have been replaced with pending
130    /// entries since the last call to [`KBucketsTable::take_applied_pending`].
131    applied_pending: VecDeque<AppliedPending<TKey, TVal>>,
132}
133
134/// A (type-safe) index into a `KBucketsTable`, i.e. a non-negative integer in the
135/// interval `[0, NUM_BUCKETS)`.
136#[derive(Debug, Copy, Clone, PartialEq, Eq)]
137struct BucketIndex(usize);
138
139impl BucketIndex {
140    /// Creates a new `BucketIndex` for a `Distance`.
141    ///
142    /// The given distance is interpreted as the distance from a `local_key` of
143    /// a `KBucketsTable`. If the distance is zero, `None` is returned, in
144    /// recognition of the fact that the only key with distance `0` to a
145    /// `local_key` is the `local_key` itself, which does not belong in any
146    /// bucket.
147    fn new(d: &Distance) -> Option<BucketIndex> {
148        d.ilog2().map(|i| BucketIndex(i as usize))
149    }
150
151    /// Gets the index value as an unsigned integer.
152    fn get(&self) -> usize {
153        self.0
154    }
155
156    /// Returns the minimum inclusive and maximum inclusive [`Distance`]
157    /// included in the bucket for this index.
158    fn range(&self) -> (Distance, Distance) {
159        let min = Distance(U256::pow(U256::from(2), U256::from(self.0)));
160        if self.0 == usize::from(u8::MAX) {
161            (min, Distance(U256::MAX))
162        } else {
163            let max = Distance(U256::pow(U256::from(2), U256::from(self.0 + 1)) - 1);
164            (min, max)
165        }
166    }
167
168    /// Generates a random distance that falls into the bucket for this index.
169    fn rand_distance(&self, rng: &mut impl rand::Rng) -> Distance {
170        let mut bytes = [0u8; 32];
171        let quot = self.0 / 8;
172        for i in 0..quot {
173            bytes[31 - i] = rng.gen();
174        }
175        let rem = (self.0 % 8) as u32;
176        let lower = usize::pow(2, rem);
177        let upper = usize::pow(2, rem + 1);
178        bytes[31 - quot] = rng.gen_range(lower..upper) as u8;
179        Distance(U256::from_big_endian(bytes.as_slice()))
180    }
181}
182
183impl<TKey, TVal> KBucketsTable<TKey, TVal>
184where
185    TKey: Clone + AsRef<KeyBytes>,
186    TVal: Clone,
187{
188    /// Creates a new, empty Kademlia routing table with entries partitioned
189    /// into buckets as per the Kademlia protocol using the provided config.
190    pub(crate) fn new(local_key: TKey, config: KBucketConfig) -> Self {
191        KBucketsTable {
192            local_key,
193            buckets: (0..NUM_BUCKETS).map(|_| KBucket::new(config)).collect(),
194            bucket_size: config.bucket_size,
195            applied_pending: VecDeque::new(),
196        }
197    }
198
199    /// Returns the local key.
200    pub(crate) fn local_key(&self) -> &TKey {
201        &self.local_key
202    }
203
204    /// Returns an `Entry` for the given key, representing the state of the entry
205    /// in the routing table.
206    ///
207    /// Returns `None` in case the key points to the local node.
208    pub(crate) fn entry<'a>(&'a mut self, key: &'a TKey) -> Option<Entry<'a, TKey, TVal>> {
209        let index = BucketIndex::new(&self.local_key.as_ref().distance(key))?;
210
211        let bucket = &mut self.buckets[index.get()];
212        if let Some(applied) = bucket.apply_pending() {
213            self.applied_pending.push_back(applied)
214        }
215        Some(Entry::new(bucket, key))
216    }
217
218    /// Returns an iterator over all buckets.
219    ///
220    /// The buckets are ordered by proximity to the `local_key`, i.e. the first
221    /// bucket is the closest bucket (containing at most one key).
222    pub(crate) fn iter(&mut self) -> impl Iterator<Item = KBucketRef<'_, TKey, TVal>> + '_ {
223        let applied_pending = &mut self.applied_pending;
224        self.buckets.iter_mut().enumerate().map(move |(i, b)| {
225            if let Some(applied) = b.apply_pending() {
226                applied_pending.push_back(applied)
227            }
228            KBucketRef {
229                index: BucketIndex(i),
230                bucket: b,
231            }
232        })
233    }
234
235    /// Returns the bucket for the distance to the given key.
236    ///
237    /// Returns `None` if the given key refers to the local key.
238    pub(crate) fn bucket<K>(&mut self, key: &K) -> Option<KBucketRef<'_, TKey, TVal>>
239    where
240        K: AsRef<KeyBytes>,
241    {
242        let d = self.local_key.as_ref().distance(key);
243        if let Some(index) = BucketIndex::new(&d) {
244            let bucket = &mut self.buckets[index.0];
245            if let Some(applied) = bucket.apply_pending() {
246                self.applied_pending.push_back(applied)
247            }
248            Some(KBucketRef { bucket, index })
249        } else {
250            None
251        }
252    }
253
254    /// Consumes the next applied pending entry, if any.
255    ///
256    /// When an entry is attempted to be inserted and the respective bucket is full,
257    /// it may be recorded as pending insertion after a timeout, see [`InsertResult::Pending`].
258    ///
259    /// If the oldest currently disconnected entry in the respective bucket does not change
260    /// its status until the timeout of pending entry expires, it is evicted and
261    /// the pending entry inserted instead. These insertions of pending entries
262    /// happens lazily, whenever the `KBucketsTable` is accessed, and the corresponding
263    /// buckets are updated accordingly. The fact that a pending entry was applied is
264    /// recorded in the `KBucketsTable` in the form of `AppliedPending` results, which must be
265    /// consumed by calling this function.
266    pub(crate) fn take_applied_pending(&mut self) -> Option<AppliedPending<TKey, TVal>> {
267        self.applied_pending.pop_front()
268    }
269
270    /// Returns an iterator over the keys closest to `target`, ordered by
271    /// increasing distance.
272    pub(crate) fn closest_keys<'a, T>(
273        &'a mut self,
274        target: &'a T,
275    ) -> impl Iterator<Item = TKey> + 'a
276    where
277        T: AsRef<KeyBytes>,
278    {
279        let distance = self.local_key.as_ref().distance(target);
280        let bucket_size = self.bucket_size;
281        ClosestIter {
282            target,
283            iter: None,
284            table: self,
285            buckets_iter: ClosestBucketsIter::new(distance),
286            fmap: |(n, _status): (&Node<TKey, TVal>, NodeStatus)| n.key.clone(),
287            bucket_size,
288        }
289    }
290
291    /// Returns an iterator over the nodes closest to the `target` key, ordered by
292    /// increasing distance.
293    pub(crate) fn closest<'a, T>(
294        &'a mut self,
295        target: &'a T,
296    ) -> impl Iterator<Item = EntryView<TKey, TVal>> + 'a
297    where
298        T: Clone + AsRef<KeyBytes>,
299        TVal: Clone,
300    {
301        let distance = self.local_key.as_ref().distance(target);
302        let bucket_size = self.bucket_size;
303        ClosestIter {
304            target,
305            iter: None,
306            table: self,
307            buckets_iter: ClosestBucketsIter::new(distance),
308            fmap: |(n, status): (&Node<TKey, TVal>, NodeStatus)| EntryView {
309                node: n.clone(),
310                status,
311            },
312            bucket_size,
313        }
314    }
315
316    /// Counts the number of nodes between the local node and the node
317    /// closest to `target`.
318    ///
319    /// The number of nodes between the local node and the target are
320    /// calculated by backtracking from the target towards the local key.
321    pub(crate) fn count_nodes_between<T>(&mut self, target: &T) -> usize
322    where
323        T: AsRef<KeyBytes>,
324    {
325        let local_key = self.local_key.clone();
326        let distance = target.as_ref().distance(&local_key);
327        let mut iter = ClosestBucketsIter::new(distance).take_while(|i| i.get() != 0);
328        if let Some(i) = iter.next() {
329            let num_first = self.buckets[i.get()]
330                .iter()
331                .filter(|(n, _)| n.key.as_ref().distance(&local_key) <= distance)
332                .count();
333            let num_rest: usize = iter.map(|i| self.buckets[i.get()].num_entries()).sum();
334            num_first + num_rest
335        } else {
336            0
337        }
338    }
339}
340
341/// An iterator over (some projection of) the closest entries in a
342/// `KBucketsTable` w.r.t. some target `Key`.
343struct ClosestIter<'a, TTarget, TKey, TVal, TMap, TOut> {
344    /// A reference to the target key whose distance to the local key determines
345    /// the order in which the buckets are traversed. The resulting
346    /// array from projecting the entries of each bucket using `fmap` is
347    /// sorted according to the distance to the target.
348    target: &'a TTarget,
349    /// A reference to all buckets of the `KBucketsTable`.
350    table: &'a mut KBucketsTable<TKey, TVal>,
351    /// The iterator over the bucket indices in the order determined by the
352    /// distance of the local key to the target.
353    buckets_iter: ClosestBucketsIter,
354    /// The iterator over the entries in the currently traversed bucket.
355    iter: Option<ClosestIterBuffer<TOut>>,
356    /// The projection function / mapping applied on each bucket as
357    /// it is encountered, producing the next `iter`ator.
358    fmap: TMap,
359    /// The maximal number of nodes that a bucket can contain.
360    bucket_size: usize,
361}
362
363/// An iterator over the bucket indices, in the order determined by the `Distance` of
364/// a target from the `local_key`, such that the entries in the buckets are incrementally
365/// further away from the target, starting with the bucket covering the target.
366struct ClosestBucketsIter {
367    /// The distance to the `local_key`.
368    distance: Distance,
369    /// The current state of the iterator.
370    state: ClosestBucketsIterState,
371}
372
373/// Operating states of a `ClosestBucketsIter`.
374enum ClosestBucketsIterState {
375    /// The starting state of the iterator yields the first bucket index and
376    /// then transitions to `ZoomIn`.
377    Start(BucketIndex),
378    /// The iterator "zooms in" to yield the next bucket containing nodes that
379    /// are incrementally closer to the local node but further from the `target`.
380    /// These buckets are identified by a `1` in the corresponding bit position
381    /// of the distance bit string. When bucket `0` is reached, the iterator
382    /// transitions to `ZoomOut`.
383    ZoomIn(BucketIndex),
384    /// Once bucket `0` has been reached, the iterator starts "zooming out"
385    /// to buckets containing nodes that are incrementally further away from
386    /// both the local key and the target. These are identified by a `0` in
387    /// the corresponding bit position of the distance bit string. When bucket
388    /// `255` is reached, the iterator transitions to state `Done`.
389    ZoomOut(BucketIndex),
390    /// The iterator is in this state once it has visited all buckets.
391    Done,
392}
393
394impl ClosestBucketsIter {
395    fn new(distance: Distance) -> Self {
396        let state = match BucketIndex::new(&distance) {
397            Some(i) => ClosestBucketsIterState::Start(i),
398            None => ClosestBucketsIterState::Start(BucketIndex(0)),
399        };
400        Self { distance, state }
401    }
402
403    fn next_in(&self, i: BucketIndex) -> Option<BucketIndex> {
404        (0..i.get()).rev().find_map(|i| {
405            if self.distance.0.bit(i) {
406                Some(BucketIndex(i))
407            } else {
408                None
409            }
410        })
411    }
412
413    fn next_out(&self, i: BucketIndex) -> Option<BucketIndex> {
414        (i.get() + 1..NUM_BUCKETS).find_map(|i| {
415            if !self.distance.0.bit(i) {
416                Some(BucketIndex(i))
417            } else {
418                None
419            }
420        })
421    }
422}
423
424impl Iterator for ClosestBucketsIter {
425    type Item = BucketIndex;
426
427    fn next(&mut self) -> Option<Self::Item> {
428        match self.state {
429            ClosestBucketsIterState::Start(i) => {
430                self.state = ClosestBucketsIterState::ZoomIn(i);
431                Some(i)
432            }
433            ClosestBucketsIterState::ZoomIn(i) => {
434                if let Some(i) = self.next_in(i) {
435                    self.state = ClosestBucketsIterState::ZoomIn(i);
436                    Some(i)
437                } else {
438                    let i = BucketIndex(0);
439                    self.state = ClosestBucketsIterState::ZoomOut(i);
440                    Some(i)
441                }
442            }
443            ClosestBucketsIterState::ZoomOut(i) => {
444                if let Some(i) = self.next_out(i) {
445                    self.state = ClosestBucketsIterState::ZoomOut(i);
446                    Some(i)
447                } else {
448                    self.state = ClosestBucketsIterState::Done;
449                    None
450                }
451            }
452            ClosestBucketsIterState::Done => None,
453        }
454    }
455}
456
457impl<TTarget, TKey, TVal, TMap, TOut> Iterator for ClosestIter<'_, TTarget, TKey, TVal, TMap, TOut>
458where
459    TTarget: AsRef<KeyBytes>,
460    TKey: Clone + AsRef<KeyBytes>,
461    TVal: Clone,
462    TMap: Fn((&Node<TKey, TVal>, NodeStatus)) -> TOut,
463    TOut: AsRef<KeyBytes>,
464{
465    type Item = TOut;
466
467    fn next(&mut self) -> Option<Self::Item> {
468        loop {
469            let (mut buffer, bucket_index) = if let Some(mut iter) = self.iter.take() {
470                if let Some(next) = iter.next() {
471                    self.iter = Some(iter);
472                    return Some(next);
473                }
474
475                let bucket_index = self.buckets_iter.next()?;
476
477                // Reusing the same buffer so if there were any allocation, it only happen once over
478                // a `ClosestIter` life.
479                iter.buffer.clear();
480
481                (iter.buffer, bucket_index)
482            } else {
483                let bucket_index = self.buckets_iter.next()?;
484
485                // Allocation only occurs if `kbucket_size` is greater than `K_VALUE`.
486                (SmallVec::with_capacity(self.bucket_size), bucket_index)
487            };
488
489            let bucket = &mut self.table.buckets[bucket_index.get()];
490            if let Some(applied) = bucket.apply_pending() {
491                self.table.applied_pending.push_back(applied)
492            }
493
494            buffer.extend(
495                bucket
496                    .iter()
497                    .take(self.bucket_size)
498                    .map(|e| (self.fmap)(e))
499                    .map(Some),
500            );
501            buffer.sort_by(|a, b| {
502                let a = a.as_ref().expect("just initialized");
503                let b = b.as_ref().expect("just initialized");
504                self.target
505                    .as_ref()
506                    .distance(a.as_ref())
507                    .cmp(&self.target.as_ref().distance(b.as_ref()))
508            });
509
510            self.iter = Some(ClosestIterBuffer::new(buffer));
511        }
512    }
513}
514
515struct ClosestIterBuffer<TOut> {
516    buffer: SmallVec<[Option<TOut>; K_VALUE.get()]>,
517    index: usize,
518}
519
520impl<TOut> ClosestIterBuffer<TOut> {
521    fn new(buffer: SmallVec<[Option<TOut>; K_VALUE.get()]>) -> Self {
522        Self { buffer, index: 0 }
523    }
524}
525
526impl<TOut> Iterator for ClosestIterBuffer<TOut> {
527    type Item = TOut;
528
529    fn next(&mut self) -> Option<Self::Item> {
530        let entry = self.buffer.get_mut(self.index)?;
531        self.index += 1;
532        entry.take()
533    }
534}
535
536/// A reference to a bucket.
537pub struct KBucketRef<'a, TKey, TVal> {
538    index: BucketIndex,
539    bucket: &'a mut KBucket<TKey, TVal>,
540}
541
542impl<'a, TKey, TVal> KBucketRef<'a, TKey, TVal>
543where
544    TKey: Clone + AsRef<KeyBytes>,
545    TVal: Clone,
546{
547    /// Returns the minimum inclusive and maximum inclusive distance for
548    /// this bucket.
549    pub fn range(&self) -> (Distance, Distance) {
550        self.index.range()
551    }
552
553    /// Checks whether the bucket is empty.
554    pub fn is_empty(&self) -> bool {
555        self.num_entries() == 0
556    }
557
558    /// Returns the number of entries in the bucket.
559    pub fn num_entries(&self) -> usize {
560        self.bucket.num_entries()
561    }
562
563    /// Returns true if the bucket has a pending node.
564    pub fn has_pending(&self) -> bool {
565        self.bucket.pending().is_some_and(|n| !n.is_ready())
566    }
567
568    /// Tests whether the given distance falls into this bucket.
569    pub fn contains(&self, d: &Distance) -> bool {
570        BucketIndex::new(d).is_some_and(|i| i == self.index)
571    }
572
573    /// Generates a random distance that falls into this bucket.
574    ///
575    /// Together with a known key `a` (e.g. the local key), a random distance `d` for
576    /// this bucket w.r.t `k` gives rise to the corresponding (random) key `b` s.t.
577    /// the XOR distance between `a` and `b` is `d`. In other words, it gives
578    /// rise to a random key falling into this bucket. See [`key::Key::for_distance`].
579    pub fn rand_distance(&self, rng: &mut impl rand::Rng) -> Distance {
580        self.index.rand_distance(rng)
581    }
582
583    /// Returns an iterator over the entries in the bucket.
584    pub fn iter(&'a self) -> impl Iterator<Item = EntryRefView<'a, TKey, TVal>> {
585        self.bucket.iter().map(move |(n, status)| EntryRefView {
586            node: NodeRefView {
587                key: &n.key,
588                value: &n.value,
589            },
590            status,
591        })
592    }
593}
594
595#[cfg(test)]
596mod tests {
597    use libp2p_identity::PeerId;
598    use quickcheck::*;
599
600    use super::*;
601
602    type TestTable = KBucketsTable<KeyBytes, ()>;
603
604    impl Arbitrary for TestTable {
605        fn arbitrary(g: &mut Gen) -> TestTable {
606            let local_key = Key::from(PeerId::random());
607            let timeout = Duration::from_secs(g.gen_range(1..360));
608            let mut config = KBucketConfig::default();
609            config.set_pending_timeout(timeout);
610            let bucket_size = config.bucket_size;
611            let mut table = TestTable::new(local_key.into(), config);
612            let mut num_total = g.gen_range(0..100);
613            for (i, b) in &mut table.buckets.iter_mut().enumerate().rev() {
614                let ix = BucketIndex(i);
615                let num = g.gen_range(0..usize::min(bucket_size, num_total) + 1);
616                num_total -= num;
617                for _ in 0..num {
618                    let distance = ix.rand_distance(&mut rand::thread_rng());
619                    let key = local_key.for_distance(distance);
620                    let node = Node { key, value: () };
621                    let status = NodeStatus::arbitrary(g);
622                    match b.insert(node, status) {
623                        InsertResult::Inserted => {}
624                        _ => panic!(),
625                    }
626                }
627            }
628            table
629        }
630    }
631
632    #[test]
633    fn buckets_are_non_overlapping_and_exhaustive() {
634        let local_key = Key::from(PeerId::random());
635        let timeout = Duration::from_secs(0);
636        let mut config = KBucketConfig::default();
637        config.set_pending_timeout(timeout);
638        let mut table = KBucketsTable::<KeyBytes, ()>::new(local_key.into(), config);
639
640        let mut prev_max = U256::from(0);
641
642        for bucket in table.iter() {
643            let (min, max) = bucket.range();
644            assert_eq!(Distance(prev_max + U256::from(1)), min);
645            prev_max = max.0;
646        }
647
648        assert_eq!(U256::MAX, prev_max);
649    }
650
651    #[test]
652    fn bucket_contains_range() {
653        fn prop(ix: u8) {
654            let index = BucketIndex(ix as usize);
655            let mut config = KBucketConfig::default();
656            config.set_pending_timeout(Duration::from_secs(0));
657            let mut bucket = KBucket::<Key<PeerId>, ()>::new(config);
658            let bucket_ref = KBucketRef {
659                index,
660                bucket: &mut bucket,
661            };
662
663            let (min, max) = bucket_ref.range();
664
665            assert!(min <= max);
666
667            assert!(bucket_ref.contains(&min));
668            assert!(bucket_ref.contains(&max));
669
670            if min != Distance(0.into()) {
671                // ^ avoid underflow
672                assert!(!bucket_ref.contains(&Distance(min.0 - 1)));
673            }
674
675            if max != Distance(U256::MAX) {
676                // ^ avoid overflow
677                assert!(!bucket_ref.contains(&Distance(max.0 + 1)));
678            }
679        }
680
681        quickcheck(prop as fn(_));
682    }
683
684    #[test]
685    fn rand_distance() {
686        fn prop(ix: u8) -> bool {
687            let d = BucketIndex(ix as usize).rand_distance(&mut rand::thread_rng());
688            let n = d.0;
689            let b = U256::from(2);
690            let e = U256::from(ix);
691            let lower = b.pow(e);
692            let upper = b.checked_pow(e + U256::from(1)).unwrap_or(U256::MAX) - U256::from(1);
693            lower <= n && n <= upper
694        }
695        quickcheck(prop as fn(_) -> _);
696    }
697
698    #[test]
699    fn entry_inserted() {
700        let local_key = Key::from(PeerId::random());
701        let other_id = Key::from(PeerId::random());
702
703        let mut table = KBucketsTable::<_, ()>::new(local_key, KBucketConfig::default());
704        if let Some(Entry::Absent(entry)) = table.entry(&other_id) {
705            match entry.insert((), NodeStatus::Connected) {
706                InsertResult::Inserted => (),
707                _ => panic!(),
708            }
709        } else {
710            panic!()
711        }
712
713        let res = table.closest_keys(&other_id).collect::<Vec<_>>();
714        assert_eq!(res.len(), 1);
715        assert_eq!(res[0], other_id);
716    }
717
718    #[test]
719    fn entry_self() {
720        let local_key = Key::from(PeerId::random());
721        let mut table = KBucketsTable::<_, ()>::new(local_key, KBucketConfig::default());
722
723        assert!(table.entry(&local_key).is_none())
724    }
725
726    #[test]
727    fn closest() {
728        let local_key = Key::from(PeerId::random());
729        let mut table = KBucketsTable::<_, ()>::new(local_key, KBucketConfig::default());
730        let mut count = 0;
731        loop {
732            if count == 100 {
733                break;
734            }
735            let key = Key::from(PeerId::random());
736            if let Some(Entry::Absent(e)) = table.entry(&key) {
737                match e.insert((), NodeStatus::Connected) {
738                    InsertResult::Inserted => count += 1,
739                    _ => continue,
740                }
741            } else {
742                panic!("entry exists")
743            }
744        }
745
746        let mut expected_keys: Vec<_> = table
747            .buckets
748            .iter()
749            .flat_map(|t| t.iter().map(|(n, _)| n.key))
750            .collect();
751
752        for _ in 0..10 {
753            let target_key = Key::from(PeerId::random());
754            let keys = table.closest_keys(&target_key).collect::<Vec<_>>();
755            // The list of keys is expected to match the result of a full-table scan.
756            expected_keys.sort_by_key(|k| k.distance(&target_key));
757            assert_eq!(keys, expected_keys);
758        }
759    }
760
761    #[test]
762    fn applied_pending() {
763        let local_key = Key::from(PeerId::random());
764        let mut config = KBucketConfig::default();
765        config.set_pending_timeout(Duration::from_millis(1));
766        let mut table = KBucketsTable::<_, ()>::new(local_key, config);
767        let expected_applied;
768        let full_bucket_index;
769        loop {
770            let key = Key::from(PeerId::random());
771            if let Some(Entry::Absent(e)) = table.entry(&key) {
772                match e.insert((), NodeStatus::Disconnected) {
773                    InsertResult::Full => {
774                        if let Some(Entry::Absent(e)) = table.entry(&key) {
775                            match e.insert((), NodeStatus::Connected) {
776                                InsertResult::Pending { disconnected } => {
777                                    expected_applied = AppliedPending {
778                                        inserted: Node { key, value: () },
779                                        evicted: Some(Node {
780                                            key: disconnected,
781                                            value: (),
782                                        }),
783                                    };
784                                    full_bucket_index = BucketIndex::new(&key.distance(&local_key));
785                                    break;
786                                }
787                                _ => panic!(),
788                            }
789                        } else {
790                            panic!()
791                        }
792                    }
793                    _ => continue,
794                }
795            } else {
796                panic!("entry exists")
797            }
798        }
799
800        // Expire the timeout for the pending entry on the full bucket.`
801        let full_bucket = &mut table.buckets[full_bucket_index.unwrap().get()];
802        let elapsed = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
803        full_bucket.pending_mut().unwrap().set_ready_at(elapsed);
804
805        match table.entry(&expected_applied.inserted.key) {
806            Some(Entry::Present(_, NodeStatus::Connected)) => {}
807            x => panic!("Unexpected entry: {x:?}"),
808        }
809
810        match table.entry(&expected_applied.evicted.as_ref().unwrap().key) {
811            Some(Entry::Absent(_)) => {}
812            x => panic!("Unexpected entry: {x:?}"),
813        }
814
815        assert_eq!(Some(expected_applied), table.take_applied_pending());
816        assert_eq!(None, table.take_applied_pending());
817    }
818
819    #[test]
820    fn count_nodes_between() {
821        fn prop(mut table: TestTable, target: Key<PeerId>) -> bool {
822            let num_to_target = table.count_nodes_between(&target);
823            let distance = table.local_key.distance(&target);
824            let base2 = U256::from(2);
825            let mut iter = ClosestBucketsIter::new(distance);
826            iter.all(|i| {
827                // Flip the distance bit related to the bucket.
828                let d = Distance(distance.0 ^ (base2.pow(U256::from(i.get()))));
829                let k = table.local_key.for_distance(d);
830                if distance.0.bit(i.get()) {
831                    // Bit flip `1` -> `0`, the key must be closer than `target`.
832                    d < distance && table.count_nodes_between(&k) <= num_to_target
833                } else {
834                    // Bit flip `0` -> `1`, the key must be farther than `target`.
835                    d > distance && table.count_nodes_between(&k) >= num_to_target
836                }
837            })
838        }
839
840        QuickCheck::new()
841            .tests(10)
842            .quickcheck(prop as fn(_, _) -> _)
843    }
844}