1mod bucket;
70mod entry;
71#[allow(clippy::ptr_offset_with_cast)]
72#[allow(clippy::assign_op_pattern)]
73mod key;
74
75use std::{collections::VecDeque, num::NonZeroUsize, time::Duration};
76
77use bucket::KBucket;
78pub use bucket::NodeStatus;
79pub use entry::*;
80use smallvec::SmallVec;
81use web_time::Instant;
82
83const NUM_BUCKETS: usize = 256;
85
86#[derive(Debug, Clone, Copy)]
88pub(crate) struct KBucketConfig {
89 bucket_size: usize,
91 pending_timeout: Duration,
95}
96
97impl Default for KBucketConfig {
98 fn default() -> Self {
99 KBucketConfig {
100 bucket_size: K_VALUE.get(),
101 pending_timeout: Duration::from_secs(60),
102 }
103 }
104}
105
106impl KBucketConfig {
107 pub(crate) fn set_bucket_size(&mut self, bucket_size: NonZeroUsize) {
109 self.bucket_size = bucket_size.get();
110 }
111
112 pub(crate) fn set_pending_timeout(&mut self, pending_timeout: Duration) {
116 self.pending_timeout = pending_timeout;
117 }
118}
119
120#[derive(Debug, Clone)]
122pub(crate) struct KBucketsTable<TKey, TVal> {
123 local_key: TKey,
125 buckets: Vec<KBucket<TKey, TVal>>,
127 bucket_size: usize,
129 applied_pending: VecDeque<AppliedPending<TKey, TVal>>,
132}
133
134#[derive(Debug, Copy, Clone, PartialEq, Eq)]
137struct BucketIndex(usize);
138
139impl BucketIndex {
140 fn new(d: &Distance) -> Option<BucketIndex> {
148 d.ilog2().map(|i| BucketIndex(i as usize))
149 }
150
151 fn get(&self) -> usize {
153 self.0
154 }
155
156 fn range(&self) -> (Distance, Distance) {
159 let min = Distance(U256::pow(U256::from(2), U256::from(self.0)));
160 if self.0 == usize::from(u8::MAX) {
161 (min, Distance(U256::MAX))
162 } else {
163 let max = Distance(U256::pow(U256::from(2), U256::from(self.0 + 1)) - 1);
164 (min, max)
165 }
166 }
167
168 fn rand_distance(&self, rng: &mut impl rand::Rng) -> Distance {
170 let mut bytes = [0u8; 32];
171 let quot = self.0 / 8;
172 for i in 0..quot {
173 bytes[31 - i] = rng.gen();
174 }
175 let rem = (self.0 % 8) as u32;
176 let lower = usize::pow(2, rem);
177 let upper = usize::pow(2, rem + 1);
178 bytes[31 - quot] = rng.gen_range(lower..upper) as u8;
179 Distance(U256::from_big_endian(bytes.as_slice()))
180 }
181}
182
183impl<TKey, TVal> KBucketsTable<TKey, TVal>
184where
185 TKey: Clone + AsRef<KeyBytes>,
186 TVal: Clone,
187{
188 pub(crate) fn new(local_key: TKey, config: KBucketConfig) -> Self {
191 KBucketsTable {
192 local_key,
193 buckets: (0..NUM_BUCKETS).map(|_| KBucket::new(config)).collect(),
194 bucket_size: config.bucket_size,
195 applied_pending: VecDeque::new(),
196 }
197 }
198
199 pub(crate) fn local_key(&self) -> &TKey {
201 &self.local_key
202 }
203
204 pub(crate) fn entry<'a>(&'a mut self, key: &'a TKey) -> Option<Entry<'a, TKey, TVal>> {
209 let index = BucketIndex::new(&self.local_key.as_ref().distance(key))?;
210
211 let bucket = &mut self.buckets[index.get()];
212 if let Some(applied) = bucket.apply_pending() {
213 self.applied_pending.push_back(applied)
214 }
215 Some(Entry::new(bucket, key))
216 }
217
218 pub(crate) fn iter(&mut self) -> impl Iterator<Item = KBucketRef<'_, TKey, TVal>> + '_ {
223 let applied_pending = &mut self.applied_pending;
224 self.buckets.iter_mut().enumerate().map(move |(i, b)| {
225 if let Some(applied) = b.apply_pending() {
226 applied_pending.push_back(applied)
227 }
228 KBucketRef {
229 index: BucketIndex(i),
230 bucket: b,
231 }
232 })
233 }
234
235 pub(crate) fn bucket<K>(&mut self, key: &K) -> Option<KBucketRef<'_, TKey, TVal>>
239 where
240 K: AsRef<KeyBytes>,
241 {
242 let d = self.local_key.as_ref().distance(key);
243 if let Some(index) = BucketIndex::new(&d) {
244 let bucket = &mut self.buckets[index.0];
245 if let Some(applied) = bucket.apply_pending() {
246 self.applied_pending.push_back(applied)
247 }
248 Some(KBucketRef { bucket, index })
249 } else {
250 None
251 }
252 }
253
254 pub(crate) fn take_applied_pending(&mut self) -> Option<AppliedPending<TKey, TVal>> {
267 self.applied_pending.pop_front()
268 }
269
270 pub(crate) fn closest_keys<'a, T>(
273 &'a mut self,
274 target: &'a T,
275 ) -> impl Iterator<Item = TKey> + 'a
276 where
277 T: AsRef<KeyBytes>,
278 {
279 let distance = self.local_key.as_ref().distance(target);
280 let bucket_size = self.bucket_size;
281 ClosestIter {
282 target,
283 iter: None,
284 table: self,
285 buckets_iter: ClosestBucketsIter::new(distance),
286 fmap: |(n, _status): (&Node<TKey, TVal>, NodeStatus)| n.key.clone(),
287 bucket_size,
288 }
289 }
290
291 pub(crate) fn closest<'a, T>(
294 &'a mut self,
295 target: &'a T,
296 ) -> impl Iterator<Item = EntryView<TKey, TVal>> + 'a
297 where
298 T: Clone + AsRef<KeyBytes>,
299 TVal: Clone,
300 {
301 let distance = self.local_key.as_ref().distance(target);
302 let bucket_size = self.bucket_size;
303 ClosestIter {
304 target,
305 iter: None,
306 table: self,
307 buckets_iter: ClosestBucketsIter::new(distance),
308 fmap: |(n, status): (&Node<TKey, TVal>, NodeStatus)| EntryView {
309 node: n.clone(),
310 status,
311 },
312 bucket_size,
313 }
314 }
315
316 pub(crate) fn count_nodes_between<T>(&mut self, target: &T) -> usize
322 where
323 T: AsRef<KeyBytes>,
324 {
325 let local_key = self.local_key.clone();
326 let distance = target.as_ref().distance(&local_key);
327 let mut iter = ClosestBucketsIter::new(distance).take_while(|i| i.get() != 0);
328 if let Some(i) = iter.next() {
329 let num_first = self.buckets[i.get()]
330 .iter()
331 .filter(|(n, _)| n.key.as_ref().distance(&local_key) <= distance)
332 .count();
333 let num_rest: usize = iter.map(|i| self.buckets[i.get()].num_entries()).sum();
334 num_first + num_rest
335 } else {
336 0
337 }
338 }
339}
340
341struct ClosestIter<'a, TTarget, TKey, TVal, TMap, TOut> {
344 target: &'a TTarget,
349 table: &'a mut KBucketsTable<TKey, TVal>,
351 buckets_iter: ClosestBucketsIter,
354 iter: Option<ClosestIterBuffer<TOut>>,
356 fmap: TMap,
359 bucket_size: usize,
361}
362
363struct ClosestBucketsIter {
367 distance: Distance,
369 state: ClosestBucketsIterState,
371}
372
373enum ClosestBucketsIterState {
375 Start(BucketIndex),
378 ZoomIn(BucketIndex),
384 ZoomOut(BucketIndex),
390 Done,
392}
393
394impl ClosestBucketsIter {
395 fn new(distance: Distance) -> Self {
396 let state = match BucketIndex::new(&distance) {
397 Some(i) => ClosestBucketsIterState::Start(i),
398 None => ClosestBucketsIterState::Start(BucketIndex(0)),
399 };
400 Self { distance, state }
401 }
402
403 fn next_in(&self, i: BucketIndex) -> Option<BucketIndex> {
404 (0..i.get()).rev().find_map(|i| {
405 if self.distance.0.bit(i) {
406 Some(BucketIndex(i))
407 } else {
408 None
409 }
410 })
411 }
412
413 fn next_out(&self, i: BucketIndex) -> Option<BucketIndex> {
414 (i.get() + 1..NUM_BUCKETS).find_map(|i| {
415 if !self.distance.0.bit(i) {
416 Some(BucketIndex(i))
417 } else {
418 None
419 }
420 })
421 }
422}
423
424impl Iterator for ClosestBucketsIter {
425 type Item = BucketIndex;
426
427 fn next(&mut self) -> Option<Self::Item> {
428 match self.state {
429 ClosestBucketsIterState::Start(i) => {
430 self.state = ClosestBucketsIterState::ZoomIn(i);
431 Some(i)
432 }
433 ClosestBucketsIterState::ZoomIn(i) => {
434 if let Some(i) = self.next_in(i) {
435 self.state = ClosestBucketsIterState::ZoomIn(i);
436 Some(i)
437 } else {
438 let i = BucketIndex(0);
439 self.state = ClosestBucketsIterState::ZoomOut(i);
440 Some(i)
441 }
442 }
443 ClosestBucketsIterState::ZoomOut(i) => {
444 if let Some(i) = self.next_out(i) {
445 self.state = ClosestBucketsIterState::ZoomOut(i);
446 Some(i)
447 } else {
448 self.state = ClosestBucketsIterState::Done;
449 None
450 }
451 }
452 ClosestBucketsIterState::Done => None,
453 }
454 }
455}
456
457impl<TTarget, TKey, TVal, TMap, TOut> Iterator for ClosestIter<'_, TTarget, TKey, TVal, TMap, TOut>
458where
459 TTarget: AsRef<KeyBytes>,
460 TKey: Clone + AsRef<KeyBytes>,
461 TVal: Clone,
462 TMap: Fn((&Node<TKey, TVal>, NodeStatus)) -> TOut,
463 TOut: AsRef<KeyBytes>,
464{
465 type Item = TOut;
466
467 fn next(&mut self) -> Option<Self::Item> {
468 loop {
469 let (mut buffer, bucket_index) = if let Some(mut iter) = self.iter.take() {
470 if let Some(next) = iter.next() {
471 self.iter = Some(iter);
472 return Some(next);
473 }
474
475 let bucket_index = self.buckets_iter.next()?;
476
477 iter.buffer.clear();
480
481 (iter.buffer, bucket_index)
482 } else {
483 let bucket_index = self.buckets_iter.next()?;
484
485 (SmallVec::with_capacity(self.bucket_size), bucket_index)
487 };
488
489 let bucket = &mut self.table.buckets[bucket_index.get()];
490 if let Some(applied) = bucket.apply_pending() {
491 self.table.applied_pending.push_back(applied)
492 }
493
494 buffer.extend(
495 bucket
496 .iter()
497 .take(self.bucket_size)
498 .map(|e| (self.fmap)(e))
499 .map(Some),
500 );
501 buffer.sort_by(|a, b| {
502 let a = a.as_ref().expect("just initialized");
503 let b = b.as_ref().expect("just initialized");
504 self.target
505 .as_ref()
506 .distance(a.as_ref())
507 .cmp(&self.target.as_ref().distance(b.as_ref()))
508 });
509
510 self.iter = Some(ClosestIterBuffer::new(buffer));
511 }
512 }
513}
514
515struct ClosestIterBuffer<TOut> {
516 buffer: SmallVec<[Option<TOut>; K_VALUE.get()]>,
517 index: usize,
518}
519
520impl<TOut> ClosestIterBuffer<TOut> {
521 fn new(buffer: SmallVec<[Option<TOut>; K_VALUE.get()]>) -> Self {
522 Self { buffer, index: 0 }
523 }
524}
525
526impl<TOut> Iterator for ClosestIterBuffer<TOut> {
527 type Item = TOut;
528
529 fn next(&mut self) -> Option<Self::Item> {
530 let entry = self.buffer.get_mut(self.index)?;
531 self.index += 1;
532 entry.take()
533 }
534}
535
536pub struct KBucketRef<'a, TKey, TVal> {
538 index: BucketIndex,
539 bucket: &'a mut KBucket<TKey, TVal>,
540}
541
542impl<'a, TKey, TVal> KBucketRef<'a, TKey, TVal>
543where
544 TKey: Clone + AsRef<KeyBytes>,
545 TVal: Clone,
546{
547 pub fn range(&self) -> (Distance, Distance) {
550 self.index.range()
551 }
552
553 pub fn is_empty(&self) -> bool {
555 self.num_entries() == 0
556 }
557
558 pub fn num_entries(&self) -> usize {
560 self.bucket.num_entries()
561 }
562
563 pub fn has_pending(&self) -> bool {
565 self.bucket.pending().is_some_and(|n| !n.is_ready())
566 }
567
568 pub fn contains(&self, d: &Distance) -> bool {
570 BucketIndex::new(d).is_some_and(|i| i == self.index)
571 }
572
573 pub fn rand_distance(&self, rng: &mut impl rand::Rng) -> Distance {
580 self.index.rand_distance(rng)
581 }
582
583 pub fn iter(&'a self) -> impl Iterator<Item = EntryRefView<'a, TKey, TVal>> {
585 self.bucket.iter().map(move |(n, status)| EntryRefView {
586 node: NodeRefView {
587 key: &n.key,
588 value: &n.value,
589 },
590 status,
591 })
592 }
593}
594
595#[cfg(test)]
596mod tests {
597 use libp2p_identity::PeerId;
598 use quickcheck::*;
599
600 use super::*;
601
602 type TestTable = KBucketsTable<KeyBytes, ()>;
603
604 impl Arbitrary for TestTable {
605 fn arbitrary(g: &mut Gen) -> TestTable {
606 let local_key = Key::from(PeerId::random());
607 let timeout = Duration::from_secs(g.gen_range(1..360));
608 let mut config = KBucketConfig::default();
609 config.set_pending_timeout(timeout);
610 let bucket_size = config.bucket_size;
611 let mut table = TestTable::new(local_key.into(), config);
612 let mut num_total = g.gen_range(0..100);
613 for (i, b) in &mut table.buckets.iter_mut().enumerate().rev() {
614 let ix = BucketIndex(i);
615 let num = g.gen_range(0..usize::min(bucket_size, num_total) + 1);
616 num_total -= num;
617 for _ in 0..num {
618 let distance = ix.rand_distance(&mut rand::thread_rng());
619 let key = local_key.for_distance(distance);
620 let node = Node { key, value: () };
621 let status = NodeStatus::arbitrary(g);
622 match b.insert(node, status) {
623 InsertResult::Inserted => {}
624 _ => panic!(),
625 }
626 }
627 }
628 table
629 }
630 }
631
632 #[test]
633 fn buckets_are_non_overlapping_and_exhaustive() {
634 let local_key = Key::from(PeerId::random());
635 let timeout = Duration::from_secs(0);
636 let mut config = KBucketConfig::default();
637 config.set_pending_timeout(timeout);
638 let mut table = KBucketsTable::<KeyBytes, ()>::new(local_key.into(), config);
639
640 let mut prev_max = U256::from(0);
641
642 for bucket in table.iter() {
643 let (min, max) = bucket.range();
644 assert_eq!(Distance(prev_max + U256::from(1)), min);
645 prev_max = max.0;
646 }
647
648 assert_eq!(U256::MAX, prev_max);
649 }
650
651 #[test]
652 fn bucket_contains_range() {
653 fn prop(ix: u8) {
654 let index = BucketIndex(ix as usize);
655 let mut config = KBucketConfig::default();
656 config.set_pending_timeout(Duration::from_secs(0));
657 let mut bucket = KBucket::<Key<PeerId>, ()>::new(config);
658 let bucket_ref = KBucketRef {
659 index,
660 bucket: &mut bucket,
661 };
662
663 let (min, max) = bucket_ref.range();
664
665 assert!(min <= max);
666
667 assert!(bucket_ref.contains(&min));
668 assert!(bucket_ref.contains(&max));
669
670 if min != Distance(0.into()) {
671 assert!(!bucket_ref.contains(&Distance(min.0 - 1)));
673 }
674
675 if max != Distance(U256::MAX) {
676 assert!(!bucket_ref.contains(&Distance(max.0 + 1)));
678 }
679 }
680
681 quickcheck(prop as fn(_));
682 }
683
684 #[test]
685 fn rand_distance() {
686 fn prop(ix: u8) -> bool {
687 let d = BucketIndex(ix as usize).rand_distance(&mut rand::thread_rng());
688 let n = d.0;
689 let b = U256::from(2);
690 let e = U256::from(ix);
691 let lower = b.pow(e);
692 let upper = b.checked_pow(e + U256::from(1)).unwrap_or(U256::MAX) - U256::from(1);
693 lower <= n && n <= upper
694 }
695 quickcheck(prop as fn(_) -> _);
696 }
697
698 #[test]
699 fn entry_inserted() {
700 let local_key = Key::from(PeerId::random());
701 let other_id = Key::from(PeerId::random());
702
703 let mut table = KBucketsTable::<_, ()>::new(local_key, KBucketConfig::default());
704 if let Some(Entry::Absent(entry)) = table.entry(&other_id) {
705 match entry.insert((), NodeStatus::Connected) {
706 InsertResult::Inserted => (),
707 _ => panic!(),
708 }
709 } else {
710 panic!()
711 }
712
713 let res = table.closest_keys(&other_id).collect::<Vec<_>>();
714 assert_eq!(res.len(), 1);
715 assert_eq!(res[0], other_id);
716 }
717
718 #[test]
719 fn entry_self() {
720 let local_key = Key::from(PeerId::random());
721 let mut table = KBucketsTable::<_, ()>::new(local_key, KBucketConfig::default());
722
723 assert!(table.entry(&local_key).is_none())
724 }
725
726 #[test]
727 fn closest() {
728 let local_key = Key::from(PeerId::random());
729 let mut table = KBucketsTable::<_, ()>::new(local_key, KBucketConfig::default());
730 let mut count = 0;
731 loop {
732 if count == 100 {
733 break;
734 }
735 let key = Key::from(PeerId::random());
736 if let Some(Entry::Absent(e)) = table.entry(&key) {
737 match e.insert((), NodeStatus::Connected) {
738 InsertResult::Inserted => count += 1,
739 _ => continue,
740 }
741 } else {
742 panic!("entry exists")
743 }
744 }
745
746 let mut expected_keys: Vec<_> = table
747 .buckets
748 .iter()
749 .flat_map(|t| t.iter().map(|(n, _)| n.key))
750 .collect();
751
752 for _ in 0..10 {
753 let target_key = Key::from(PeerId::random());
754 let keys = table.closest_keys(&target_key).collect::<Vec<_>>();
755 expected_keys.sort_by_key(|k| k.distance(&target_key));
757 assert_eq!(keys, expected_keys);
758 }
759 }
760
761 #[test]
762 fn applied_pending() {
763 let local_key = Key::from(PeerId::random());
764 let mut config = KBucketConfig::default();
765 config.set_pending_timeout(Duration::from_millis(1));
766 let mut table = KBucketsTable::<_, ()>::new(local_key, config);
767 let expected_applied;
768 let full_bucket_index;
769 loop {
770 let key = Key::from(PeerId::random());
771 if let Some(Entry::Absent(e)) = table.entry(&key) {
772 match e.insert((), NodeStatus::Disconnected) {
773 InsertResult::Full => {
774 if let Some(Entry::Absent(e)) = table.entry(&key) {
775 match e.insert((), NodeStatus::Connected) {
776 InsertResult::Pending { disconnected } => {
777 expected_applied = AppliedPending {
778 inserted: Node { key, value: () },
779 evicted: Some(Node {
780 key: disconnected,
781 value: (),
782 }),
783 };
784 full_bucket_index = BucketIndex::new(&key.distance(&local_key));
785 break;
786 }
787 _ => panic!(),
788 }
789 } else {
790 panic!()
791 }
792 }
793 _ => continue,
794 }
795 } else {
796 panic!("entry exists")
797 }
798 }
799
800 let full_bucket = &mut table.buckets[full_bucket_index.unwrap().get()];
802 let elapsed = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
803 full_bucket.pending_mut().unwrap().set_ready_at(elapsed);
804
805 match table.entry(&expected_applied.inserted.key) {
806 Some(Entry::Present(_, NodeStatus::Connected)) => {}
807 x => panic!("Unexpected entry: {x:?}"),
808 }
809
810 match table.entry(&expected_applied.evicted.as_ref().unwrap().key) {
811 Some(Entry::Absent(_)) => {}
812 x => panic!("Unexpected entry: {x:?}"),
813 }
814
815 assert_eq!(Some(expected_applied), table.take_applied_pending());
816 assert_eq!(None, table.take_applied_pending());
817 }
818
819 #[test]
820 fn count_nodes_between() {
821 fn prop(mut table: TestTable, target: Key<PeerId>) -> bool {
822 let num_to_target = table.count_nodes_between(&target);
823 let distance = table.local_key.distance(&target);
824 let base2 = U256::from(2);
825 let mut iter = ClosestBucketsIter::new(distance);
826 iter.all(|i| {
827 let d = Distance(distance.0 ^ (base2.pow(U256::from(i.get()))));
829 let k = table.local_key.for_distance(d);
830 if distance.0.bit(i.get()) {
831 d < distance && table.count_nodes_between(&k) <= num_to_target
833 } else {
834 d > distance && table.count_nodes_between(&k) >= num_to_target
836 }
837 })
838 }
839
840 QuickCheck::new()
841 .tests(10)
842 .quickcheck(prop as fn(_, _) -> _)
843 }
844}