mod bucket;
mod entry;
mod key;
pub use entry::*;
use arrayvec::{self, ArrayVec};
use bucket::KBucket;
use std::collections::VecDeque;
use std::time::{Duration, Instant};
const NUM_BUCKETS: usize = 256;
#[derive(Debug, Clone)]
pub struct KBucketsTable<TKey, TVal> {
local_key: TKey,
buckets: Vec<KBucket<TKey, TVal>>,
applied_pending: VecDeque<AppliedPending<TKey, TVal>>
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct BucketIndex(usize);
impl BucketIndex {
fn new(d: &Distance) -> Option<BucketIndex> {
d.ilog2().map(|i| BucketIndex(i as usize))
}
fn get(&self) -> usize {
self.0
}
fn range(&self) -> (Distance, Distance) {
let min = Distance(U256::pow(U256::from(2), U256::from(self.0)));
if self.0 == usize::from(u8::MAX) {
(min, Distance(U256::MAX))
} else {
let max = Distance(U256::pow(U256::from(2), U256::from(self.0 + 1)) - 1);
(min, max)
}
}
fn rand_distance(&self, rng: &mut impl rand::Rng) -> Distance {
let mut bytes = [0u8; 32];
let quot = self.0 / 8;
for i in 0 .. quot {
bytes[31 - i] = rng.gen();
}
let rem = (self.0 % 8) as u32;
let lower = usize::pow(2, rem);
let upper = usize::pow(2, rem + 1);
bytes[31 - quot] = rng.gen_range(lower, upper) as u8;
Distance(U256::from(bytes))
}
}
impl<TKey, TVal> KBucketsTable<TKey, TVal>
where
TKey: Clone + AsRef<KeyBytes>,
TVal: Clone
{
pub fn new(local_key: TKey, pending_timeout: Duration) -> Self {
KBucketsTable {
local_key,
buckets: (0 .. NUM_BUCKETS).map(|_| KBucket::new(pending_timeout)).collect(),
applied_pending: VecDeque::new()
}
}
pub fn local_key(&self) -> &TKey {
&self.local_key
}
pub fn entry<'a>(&'a mut self, key: &'a TKey) -> Entry<'a, TKey, TVal> {
let index = BucketIndex::new(&self.local_key.as_ref().distance(key));
if let Some(i) = index {
let bucket = &mut self.buckets[i.get()];
if let Some(applied) = bucket.apply_pending() {
self.applied_pending.push_back(applied)
}
Entry::new(bucket, key)
} else {
Entry::SelfEntry
}
}
pub fn iter<'a>(&'a mut self) -> impl Iterator<Item = KBucketRef<'a, TKey, TVal>> + 'a {
let applied_pending = &mut self.applied_pending;
self.buckets.iter_mut().enumerate().map(move |(i, b)| {
if let Some(applied) = b.apply_pending() {
applied_pending.push_back(applied)
}
KBucketRef {
index: BucketIndex(i),
bucket: b
}
})
}
pub fn bucket<K>(&mut self, key: &K) -> Option<KBucketRef<'_, TKey, TVal>>
where
K: AsRef<KeyBytes>,
{
let d = self.local_key.as_ref().distance(key);
if let Some(index) = BucketIndex::new(&d) {
let bucket = &mut self.buckets[index.0];
if let Some(applied) = bucket.apply_pending() {
self.applied_pending.push_back(applied)
}
Some(KBucketRef { bucket, index })
} else {
None
}
}
pub fn take_applied_pending(&mut self) -> Option<AppliedPending<TKey, TVal>> {
self.applied_pending.pop_front()
}
pub fn closest_keys<'a, T>(&'a mut self, target: &'a T)
-> impl Iterator<Item = TKey> + 'a
where
T: Clone + AsRef<KeyBytes>
{
let distance = self.local_key.as_ref().distance(target);
ClosestIter {
target,
iter: None,
table: self,
buckets_iter: ClosestBucketsIter::new(distance),
fmap: |b: &KBucket<TKey, _>| -> ArrayVec<_> {
b.iter().map(|(n,_)| n.key.clone()).collect()
}
}
}
pub fn closest<'a, T>(&'a mut self, target: &'a T)
-> impl Iterator<Item = EntryView<TKey, TVal>> + 'a
where
T: Clone + AsRef<KeyBytes>,
TVal: Clone
{
let distance = self.local_key.as_ref().distance(target);
ClosestIter {
target,
iter: None,
table: self,
buckets_iter: ClosestBucketsIter::new(distance),
fmap: |b: &KBucket<_, TVal>| -> ArrayVec<_> {
b.iter().map(|(n, status)| EntryView {
node: n.clone(),
status
}).collect()
}
}
}
pub fn count_nodes_between<T>(&mut self, target: &T) -> usize
where
T: AsRef<KeyBytes>
{
let local_key = self.local_key.clone();
let distance = target.as_ref().distance(&local_key);
let mut iter = ClosestBucketsIter::new(distance).take_while(|i| i.get() != 0);
if let Some(i) = iter.next() {
let num_first = self.buckets[i.get()].iter()
.filter(|(n,_)| n.key.as_ref().distance(&local_key) <= distance)
.count();
let num_rest: usize = iter.map(|i| self.buckets[i.get()].num_entries()).sum();
num_first + num_rest
} else {
0
}
}
}
struct ClosestIter<'a, TTarget, TKey, TVal, TMap, TOut> {
target: &'a TTarget,
table: &'a mut KBucketsTable<TKey, TVal>,
buckets_iter: ClosestBucketsIter,
iter: Option<arrayvec::IntoIter<[TOut; K_VALUE.get()]>>,
fmap: TMap
}
struct ClosestBucketsIter {
distance: Distance,
state: ClosestBucketsIterState
}
enum ClosestBucketsIterState {
Start(BucketIndex),
ZoomIn(BucketIndex),
ZoomOut(BucketIndex),
Done
}
impl ClosestBucketsIter {
fn new(distance: Distance) -> Self {
let state = match BucketIndex::new(&distance) {
Some(i) => ClosestBucketsIterState::Start(i),
None => ClosestBucketsIterState::Start(BucketIndex(0))
};
Self { distance, state }
}
fn next_in(&self, i: BucketIndex) -> Option<BucketIndex> {
(0 .. i.get()).rev().find_map(|i|
if self.distance.0.bit(i) {
Some(BucketIndex(i))
} else {
None
})
}
fn next_out(&self, i: BucketIndex) -> Option<BucketIndex> {
(i.get() + 1 .. NUM_BUCKETS).find_map(|i|
if !self.distance.0.bit(i) {
Some(BucketIndex(i))
} else {
None
})
}
}
impl Iterator for ClosestBucketsIter {
type Item = BucketIndex;
fn next(&mut self) -> Option<Self::Item> {
match self.state {
ClosestBucketsIterState::Start(i) => {
self.state = ClosestBucketsIterState::ZoomIn(i);
Some(i)
}
ClosestBucketsIterState::ZoomIn(i) =>
if let Some(i) = self.next_in(i) {
self.state = ClosestBucketsIterState::ZoomIn(i);
Some(i)
} else {
let i = BucketIndex(0);
self.state = ClosestBucketsIterState::ZoomOut(i);
Some(i)
}
ClosestBucketsIterState::ZoomOut(i) =>
if let Some(i) = self.next_out(i) {
self.state = ClosestBucketsIterState::ZoomOut(i);
Some(i)
} else {
self.state = ClosestBucketsIterState::Done;
None
}
ClosestBucketsIterState::Done => None
}
}
}
impl<TTarget, TKey, TVal, TMap, TOut> Iterator
for ClosestIter<'_, TTarget, TKey, TVal, TMap, TOut>
where
TTarget: AsRef<KeyBytes>,
TKey: Clone + AsRef<KeyBytes>,
TVal: Clone,
TMap: Fn(&KBucket<TKey, TVal>) -> ArrayVec<[TOut; K_VALUE.get()]>,
TOut: AsRef<KeyBytes>
{
type Item = TOut;
fn next(&mut self) -> Option<Self::Item> {
loop {
match &mut self.iter {
Some(iter) => match iter.next() {
Some(k) => return Some(k),
None => self.iter = None
}
None => {
if let Some(i) = self.buckets_iter.next() {
let bucket = &mut self.table.buckets[i.get()];
if let Some(applied) = bucket.apply_pending() {
self.table.applied_pending.push_back(applied)
}
let mut v = (self.fmap)(bucket);
v.sort_by(|a, b|
self.target.as_ref().distance(a.as_ref())
.cmp(&self.target.as_ref().distance(b.as_ref())));
self.iter = Some(v.into_iter());
} else {
return None
}
}
}
}
}
}
pub struct KBucketRef<'a, TKey, TVal> {
index: BucketIndex,
bucket: &'a mut KBucket<TKey, TVal>
}
impl<'a, TKey, TVal> KBucketRef<'a, TKey, TVal>
where
TKey: Clone + AsRef<KeyBytes>,
TVal: Clone
{
pub fn range(&self) -> (Distance, Distance) {
self.index.range()
}
pub fn is_empty(&self) -> bool {
self.num_entries() == 0
}
pub fn num_entries(&self) -> usize {
self.bucket.num_entries()
}
pub fn has_pending(&self) -> bool {
self.bucket.pending().map_or(false, |n| !n.is_ready())
}
pub fn contains(&self, d: &Distance) -> bool {
BucketIndex::new(d).map_or(false, |i| i == self.index)
}
pub fn rand_distance(&self, rng: &mut impl rand::Rng) -> Distance {
self.index.rand_distance(rng)
}
pub fn iter(&'a self) -> impl Iterator<Item = EntryRefView<'a, TKey, TVal>> {
self.bucket.iter().map(move |(n, status)| {
EntryRefView {
node: NodeRefView {
key: &n.key,
value: &n.value
},
status
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use libp2p_core::PeerId;
use quickcheck::*;
use rand::Rng;
type TestTable = KBucketsTable<KeyBytes, ()>;
impl Arbitrary for TestTable {
fn arbitrary<G: Gen>(g: &mut G) -> TestTable {
let local_key = Key::from(PeerId::random());
let timeout = Duration::from_secs(g.gen_range(1, 360));
let mut table = TestTable::new(local_key.clone().into(), timeout);
let mut num_total = g.gen_range(0, 100);
for (i, b) in &mut table.buckets.iter_mut().enumerate().rev() {
let ix = BucketIndex(i);
let num = g.gen_range(0, usize::min(K_VALUE.get(), num_total) + 1);
num_total -= num;
for _ in 0 .. num {
let distance = ix.rand_distance(g);
let key = local_key.for_distance(distance);
let node = Node { key: key.clone(), value: () };
let status = NodeStatus::arbitrary(g);
match b.insert(node, status) {
InsertResult::Inserted => {}
_ => panic!()
}
}
}
table
}
}
#[test]
fn buckets_are_non_overlapping_and_exhaustive() {
let local_key = Key::from(PeerId::random());
let timeout = Duration::from_secs(0);
let mut table = KBucketsTable::<KeyBytes, ()>::new(local_key.into(), timeout);
let mut prev_max = U256::from(0);
for bucket in table.iter() {
let (min, max) = bucket.range();
assert_eq!(Distance(prev_max + U256::from(1)), min);
prev_max = max.0;
}
assert_eq!(U256::MAX, prev_max);
}
#[test]
fn bucket_contains_range() {
fn prop(ix: u8) {
let index = BucketIndex(ix as usize);
let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(0));
let bucket_ref = KBucketRef {
index,
bucket: &mut bucket,
};
let (min, max) = bucket_ref.range();
assert!(min <= max);
assert!(bucket_ref.contains(&min));
assert!(bucket_ref.contains(&max));
assert!(!bucket_ref.contains(&Distance(min.0 - 1)));
assert!(!bucket_ref.contains(&Distance(max.0 + 1)));
}
quickcheck(prop as fn(_));
}
#[test]
fn rand_distance() {
fn prop(ix: u8) -> bool {
let d = BucketIndex(ix as usize).rand_distance(&mut rand::thread_rng());
let n = U256::from(<[u8; 32]>::from(d.0));
let b = U256::from(2);
let e = U256::from(ix);
let lower = b.pow(e);
let upper = b.pow(e + U256::from(1)) - U256::from(1);
lower <= n && n <= upper
}
quickcheck(prop as fn(_) -> _);
}
#[test]
fn entry_inserted() {
let local_key = Key::from(PeerId::random());
let other_id = Key::from(PeerId::random());
let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5));
if let Entry::Absent(entry) = table.entry(&other_id) {
match entry.insert((), NodeStatus::Connected) {
InsertResult::Inserted => (),
_ => panic!()
}
} else {
panic!()
}
let res = table.closest_keys(&other_id).collect::<Vec<_>>();
assert_eq!(res.len(), 1);
assert_eq!(res[0], other_id);
}
#[test]
fn entry_self() {
let local_key = Key::from(PeerId::random());
let mut table = KBucketsTable::<_, ()>::new(local_key.clone(), Duration::from_secs(5));
match table.entry(&local_key) {
Entry::SelfEntry => (),
_ => panic!(),
}
}
#[test]
fn closest() {
let local_key = Key::from(PeerId::random());
let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5));
let mut count = 0;
loop {
if count == 100 { break; }
let key = Key::from(PeerId::random());
if let Entry::Absent(e) = table.entry(&key) {
match e.insert((), NodeStatus::Connected) {
InsertResult::Inserted => count += 1,
_ => continue,
}
} else {
panic!("entry exists")
}
}
let mut expected_keys: Vec<_> = table.buckets
.iter()
.flat_map(|t| t.iter().map(|(n,_)| n.key.clone()))
.collect();
for _ in 0 .. 10 {
let target_key = Key::from(PeerId::random());
let keys = table.closest_keys(&target_key).collect::<Vec<_>>();
expected_keys.sort_by_key(|k| k.distance(&target_key));
assert_eq!(keys, expected_keys);
}
}
#[test]
fn applied_pending() {
let local_key = Key::from(PeerId::random());
let mut table = KBucketsTable::<_, ()>::new(local_key.clone(), Duration::from_millis(1));
let expected_applied;
let full_bucket_index;
loop {
let key = Key::from(PeerId::random());
if let Entry::Absent(e) = table.entry(&key) {
match e.insert((), NodeStatus::Disconnected) {
InsertResult::Full => {
if let Entry::Absent(e) = table.entry(&key) {
match e.insert((), NodeStatus::Connected) {
InsertResult::Pending { disconnected } => {
expected_applied = AppliedPending {
inserted: Node { key: key.clone(), value: () },
evicted: Some(Node { key: disconnected, value: () })
};
full_bucket_index = BucketIndex::new(&key.distance(&local_key));
break
},
_ => panic!()
}
} else {
panic!()
}
},
_ => continue,
}
} else {
panic!("entry exists")
}
}
let full_bucket = &mut table.buckets[full_bucket_index.unwrap().get()];
let elapsed = Instant::now() - Duration::from_secs(1);
full_bucket.pending_mut().unwrap().set_ready_at(elapsed);
match table.entry(&expected_applied.inserted.key) {
Entry::Present(_, NodeStatus::Connected) => {}
x => panic!("Unexpected entry: {:?}", x)
}
match table.entry(&expected_applied.evicted.as_ref().unwrap().key) {
Entry::Absent(_) => {}
x => panic!("Unexpected entry: {:?}", x)
}
assert_eq!(Some(expected_applied), table.take_applied_pending());
assert_eq!(None, table.take_applied_pending());
}
#[test]
fn count_nodes_between() {
fn prop(mut table: TestTable, target: Key<PeerId>) -> bool {
let num_to_target = table.count_nodes_between(&target);
let distance = table.local_key.distance(&target);
let base2 = U256::from(2);
let mut iter = ClosestBucketsIter::new(distance);
iter.all(|i| {
let d = Distance(distance.0 ^ (base2.pow(U256::from(i.get()))));
let k = table.local_key.for_distance(d);
if distance.0.bit(i.get()) {
d < distance && table.count_nodes_between(&k) <= num_to_target
} else {
d > distance && table.count_nodes_between(&k) >= num_to_target
}
})
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_,_) -> _)
}
}