mod test;
use crate::K_VALUE;
use crate::addresses::Addresses;
use crate::handler::{
KademliaHandlerProto,
KademliaHandlerConfig,
KademliaRequestId,
KademliaHandlerEvent,
KademliaHandlerIn
};
use crate::jobs::*;
use crate::kbucket::{self, KBucketsTable, NodeStatus};
use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState};
use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
use fnv::{FnvHashMap, FnvHashSet};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId};
use libp2p_swarm::{
DialPeerCondition,
NetworkBehaviour,
NetworkBehaviourAction,
NotifyHandler,
PollParameters,
};
use log::{info, debug, warn};
use smallvec::SmallVec;
use std::{borrow::Cow, error, iter, time::Duration};
use std::collections::{HashSet, VecDeque};
use std::fmt;
use std::num::NonZeroUsize;
use std::task::{Context, Poll};
use std::vec;
use wasm_timer::Instant;
pub use crate::query::QueryStats;
pub struct Kademlia<TStore> {
kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
kbucket_inserts: KademliaBucketInserts,
protocol_config: KademliaProtocolConfig,
queries: QueryPool<QueryInner>,
connected_peers: FnvHashSet<PeerId>,
add_provider_job: Option<AddProviderJob>,
put_record_job: Option<PutRecordJob>,
record_ttl: Option<Duration>,
provider_record_ttl: Option<Duration>,
connection_idle_timeout: Duration,
queued_events: VecDeque<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>>,
local_addrs: HashSet<Multiaddr>,
store: TStore,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum KademliaBucketInserts {
OnConnected,
Manual,
}
#[derive(Debug, Clone)]
pub struct KademliaConfig {
kbucket_pending_timeout: Duration,
query_config: QueryConfig,
protocol_config: KademliaProtocolConfig,
record_ttl: Option<Duration>,
record_replication_interval: Option<Duration>,
record_publication_interval: Option<Duration>,
provider_record_ttl: Option<Duration>,
provider_publication_interval: Option<Duration>,
connection_idle_timeout: Duration,
kbucket_inserts: KademliaBucketInserts,
}
impl Default for KademliaConfig {
fn default() -> Self {
KademliaConfig {
kbucket_pending_timeout: Duration::from_secs(60),
query_config: QueryConfig::default(),
protocol_config: Default::default(),
record_ttl: Some(Duration::from_secs(36 * 60 * 60)),
record_replication_interval: Some(Duration::from_secs(60 * 60)),
record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
connection_idle_timeout: Duration::from_secs(10),
kbucket_inserts: KademliaBucketInserts::OnConnected,
}
}
}
impl KademliaConfig {
pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) -> &mut Self {
self.protocol_config.set_protocol_name(name);
self
}
pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
self.query_config.timeout = timeout;
self
}
pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
self.query_config.replication_factor = replication_factor;
self
}
pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
self.query_config.parallelism = parallelism;
self
}
pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
self.query_config.disjoint_query_paths = enabled;
self
}
pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
self.record_ttl = record_ttl;
self
}
pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
self.record_replication_interval = interval;
self
}
pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
self.record_publication_interval = interval;
self
}
pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
self.provider_record_ttl = ttl;
self
}
pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
self.provider_publication_interval = interval;
self
}
pub fn set_connection_idle_timeout(&mut self, duration: Duration) -> &mut Self {
self.connection_idle_timeout = duration;
self
}
pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
self.protocol_config.set_max_packet_size(size);
self
}
pub fn set_kbucket_inserts(&mut self, inserts: KademliaBucketInserts) -> &mut Self {
self.kbucket_inserts = inserts;
self
}
}
impl<TStore> Kademlia<TStore>
where
for<'a> TStore: RecordStore<'a>
{
pub fn new(id: PeerId, store: TStore) -> Self {
Self::with_config(id, store, Default::default())
}
pub fn protocol_name(&self) -> &[u8] {
self.protocol_config.protocol_name()
}
pub fn with_config(id: PeerId, store: TStore, config: KademliaConfig) -> Self {
let local_key = kbucket::Key::from(id);
let put_record_job = config
.record_replication_interval
.or(config.record_publication_interval)
.map(|interval| PutRecordJob::new(
id,
interval,
config.record_publication_interval,
config.record_ttl,
));
let add_provider_job = config
.provider_publication_interval
.map(AddProviderJob::new);
Kademlia {
store,
kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
kbucket_inserts: config.kbucket_inserts,
protocol_config: config.protocol_config,
queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
queries: QueryPool::new(config.query_config),
connected_peers: Default::default(),
add_provider_job,
put_record_job,
record_ttl: config.record_ttl,
provider_record_ttl: config.provider_record_ttl,
connection_idle_timeout: config.connection_idle_timeout,
local_addrs: HashSet::new()
}
}
pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
self.queries.iter().filter_map(|query|
if !query.is_finished() {
Some(QueryRef { query })
} else {
None
})
}
pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
self.queries.iter_mut().filter_map(|query|
if !query.is_finished() {
Some(QueryMut { query })
} else {
None
})
}
pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
self.queries.get(id).and_then(|query|
if !query.is_finished() {
Some(QueryRef { query })
} else {
None
})
}
pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
self.queries.get_mut(id).and_then(|query|
if !query.is_finished() {
Some(QueryMut { query })
} else {
None
})
}
pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
let key = kbucket::Key::from(*peer);
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry, _) => {
if entry.value().insert(address) {
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::RoutingUpdated {
peer: *peer,
addresses: entry.value().clone(),
old_peer: None,
}
))
}
RoutingUpdate::Success
}
kbucket::Entry::Pending(mut entry, _) => {
entry.value().insert(address);
RoutingUpdate::Pending
}
kbucket::Entry::Absent(entry) => {
let addresses = Addresses::new(address);
let status =
if self.connected_peers.contains(peer) {
NodeStatus::Connected
} else {
NodeStatus::Disconnected
};
match entry.insert(addresses.clone(), status) {
kbucket::InsertResult::Inserted => {
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::RoutingUpdated {
peer: *peer,
addresses,
old_peer: None,
}
));
RoutingUpdate::Success
},
kbucket::InsertResult::Full => {
debug!("Bucket full. Peer not added to routing table: {}", peer);
RoutingUpdate::Failed
},
kbucket::InsertResult::Pending { disconnected } => {
self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: disconnected.into_preimage(),
condition: DialPeerCondition::Disconnected
});
RoutingUpdate::Pending
},
}
},
kbucket::Entry::SelfEntry => RoutingUpdate::Failed,
}
}
pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr)
-> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>>
{
let key = kbucket::Key::from(*peer);
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry, _) => {
if entry.value().remove(address).is_err() {
Some(entry.remove())
} else {
None
}
}
kbucket::Entry::Pending(mut entry, _) => {
if entry.value().remove(address).is_err() {
Some(entry.remove())
} else {
None
}
}
kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => {
None
}
}
}
pub fn remove_peer(&mut self, peer: &PeerId)
-> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>>
{
let key = kbucket::Key::from(*peer);
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(entry, _) => {
Some(entry.remove())
}
kbucket::Entry::Pending(entry, _) => {
Some(entry.remove())
}
kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => {
None
}
}
}
pub fn kbuckets(&mut self)
-> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
{
self.kbuckets.iter().filter(|b| !b.is_empty())
}
pub fn kbucket<K>(&mut self, key: K)
-> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
where
K: Into<kbucket::Key<K>> + Clone
{
self.kbuckets.bucket(&key.into())
}
pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
where
K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone
{
let info = QueryInfo::GetClosestPeers { key: key.clone().into() };
let target: kbucket::Key<K> = key.into();
let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info);
self.queries.add_iter_closest(target.clone(), peers, inner)
}
pub fn get_record(&mut self, key: &record::Key, quorum: Quorum) -> QueryId {
let quorum = quorum.eval(self.queries.config().replication_factor);
let mut records = Vec::with_capacity(quorum.get());
if let Some(record) = self.store.get(key) {
if record.is_expired(Instant::now()) {
self.store.remove(key)
} else {
records.push(PeerRecord{ peer: None, record: record.into_owned()});
}
}
let done = records.len() >= quorum.get();
let target = kbucket::Key::new(key.clone());
let info = QueryInfo::GetRecord { key: key.clone(), records, quorum, cache_at: None };
let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info);
let id = self.queries.add_iter_closest(target.clone(), peers, inner);
if done {
self.queries.get_mut(&id).expect("by (*)").finish();
}
id
}
pub fn put_record(&mut self, mut record: Record, quorum: Quorum) -> Result<QueryId, store::Error> {
record.publisher = Some(*self.kbuckets.local_key().preimage());
self.store.put(record.clone())?;
record.expires = record.expires.or_else(||
self.record_ttl.map(|ttl| Instant::now() + ttl));
let quorum = quorum.eval(self.queries.config().replication_factor);
let target = kbucket::Key::new(record.key.clone());
let peers = self.kbuckets.closest_keys(&target);
let context = PutRecordContext::Publish;
let info = QueryInfo::PutRecord {
context,
record,
quorum,
phase: PutRecordPhase::GetClosestPeers
};
let inner = QueryInner::new(info);
Ok(self.queries.add_iter_closest(target.clone(), peers, inner))
}
pub fn remove_record(&mut self, key: &record::Key) {
if let Some(r) = self.store.get(key) {
if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
self.store.remove(key)
}
}
}
pub fn store_mut(&mut self) -> &mut TStore {
&mut self.store
}
pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
let local_key = self.kbuckets.local_key().clone();
let info = QueryInfo::Bootstrap {
peer: *local_key.preimage(),
remaining: None
};
let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
if peers.is_empty() {
Err(NoKnownPeers())
} else {
let inner = QueryInner::new(info);
Ok(self.queries.add_iter_closest(local_key, peers, inner))
}
}
pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
let local_addrs = Vec::new();
let record = ProviderRecord::new(
key.clone(),
*self.kbuckets.local_key().preimage(),
local_addrs);
self.store.add_provider(record)?;
let target = kbucket::Key::new(key.clone());
let peers = self.kbuckets.closest_keys(&target);
let context = AddProviderContext::Publish;
let info = QueryInfo::AddProvider {
context,
key,
phase: AddProviderPhase::GetClosestPeers
};
let inner = QueryInner::new(info);
let id = self.queries.add_iter_closest(target.clone(), peers, inner);
Ok(id)
}
pub fn stop_providing(&mut self, key: &record::Key) {
self.store.remove_provider(key, self.kbuckets.local_key().preimage());
}
pub fn get_providers(&mut self, key: record::Key) -> QueryId {
let info = QueryInfo::GetProviders {
key: key.clone(),
providers: HashSet::new(),
};
let target = kbucket::Key::new(key);
let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info);
self.queries.add_iter_closest(target.clone(), peers, inner)
}
fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
where
I: Iterator<Item = &'a KadPeer> + Clone
{
let local_id = self.kbuckets.local_key().preimage();
let others_iter = peers.filter(|p| &p.node_id != local_id);
if let Some(query) = self.queries.get_mut(query_id) {
log::trace!("Request to {:?} in query {:?} succeeded.", source, query_id);
for peer in others_iter.clone() {
log::trace!("Peer {:?} reported by {:?} in query {:?}.",
peer, source, query_id);
let addrs = peer.multiaddrs.iter().cloned().collect();
query.inner.addresses.insert(peer.node_id, addrs);
}
query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
}
}
fn find_closest<T: Clone>(&mut self, target: &kbucket::Key<T>, source: &PeerId) -> Vec<KadPeer> {
if target == self.kbuckets.local_key() {
Vec::new()
} else {
self.kbuckets
.closest(target)
.filter(|e| e.node.key.preimage() != source)
.take(self.queries.config().replication_factor.get())
.map(KadPeer::from)
.collect()
}
}
fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
let kbuckets = &mut self.kbuckets;
let connected = &mut self.connected_peers;
let local_addrs = &self.local_addrs;
self.store.providers(key)
.into_iter()
.filter_map(move |p|
if &p.provider != source {
let node_id = p.provider;
let multiaddrs = p.addresses;
let connection_ty = if connected.contains(&node_id) {
KadConnectionType::Connected
} else {
KadConnectionType::NotConnected
};
if multiaddrs.is_empty() {
if &node_id == kbuckets.local_key().preimage() {
Some(local_addrs.iter().cloned().collect::<Vec<_>>())
} else {
let key = kbucket::Key::from(node_id);
kbuckets.entry(&key).view().map(|e| e.node.value.clone().into_vec())
}
} else {
Some(multiaddrs)
}
.map(|multiaddrs| {
KadPeer {
node_id,
multiaddrs,
connection_ty,
}
})
} else {
None
})
.take(self.queries.config().replication_factor.get())
.collect()
}
fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
let info = QueryInfo::AddProvider {
context,
key: key.clone(),
phase: AddProviderPhase::GetClosestPeers
};
let target = kbucket::Key::new(key);
let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info);
self.queries.add_iter_closest(target.clone(), peers, inner);
}
fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
let quorum = quorum.eval(self.queries.config().replication_factor);
let target = kbucket::Key::new(record.key.clone());
let peers = self.kbuckets.closest_keys(&target);
let info = QueryInfo::PutRecord {
record, quorum, context, phase: PutRecordPhase::GetClosestPeers
};
let inner = QueryInner::new(info);
self.queries.add_iter_closest(target.clone(), peers, inner);
}
fn connection_updated(&mut self, peer: PeerId, address: Option<Multiaddr>, new_status: NodeStatus) {
let key = kbucket::Key::from(peer);
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry, old_status) => {
if let Some(address) = address {
if entry.value().insert(address) {
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::RoutingUpdated {
peer,
addresses: entry.value().clone(),
old_peer: None,
}
))
}
}
if old_status != new_status {
entry.update(new_status);
}
},
kbucket::Entry::Pending(mut entry, old_status) => {
if let Some(address) = address {
entry.value().insert(address);
}
if old_status != new_status {
entry.update(new_status);
}
},
kbucket::Entry::Absent(entry) => {
if new_status != NodeStatus::Connected {
return
}
match (address, self.kbucket_inserts) {
(None, _) => {
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::UnroutablePeer { peer }
));
}
(Some(a), KademliaBucketInserts::Manual) => {
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::RoutablePeer { peer, address: a }
));
}
(Some(a), KademliaBucketInserts::OnConnected) => {
let addresses = Addresses::new(a);
match entry.insert(addresses.clone(), new_status) {
kbucket::InsertResult::Inserted => {
let event = KademliaEvent::RoutingUpdated {
peer,
addresses,
old_peer: None,
};
self.queued_events.push_back(
NetworkBehaviourAction::GenerateEvent(event));
},
kbucket::InsertResult::Full => {
debug!("Bucket full. Peer not added to routing table: {}", peer);
let address = addresses.first().clone();
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::RoutablePeer { peer, address }
));
},
kbucket::InsertResult::Pending { disconnected } => {
debug_assert!(!self.connected_peers.contains(disconnected.preimage()));
let address = addresses.first().clone();
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::PendingRoutablePeer { peer, address }
));
self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: disconnected.into_preimage(),
condition: DialPeerCondition::Disconnected
})
},
}
}
}
},
_ => {}
}
}
fn query_finished(&mut self, q: Query<QueryInner>, params: &mut impl PollParameters)
-> Option<KademliaEvent>
{
let query_id = q.id();
log::trace!("Query {:?} finished.", query_id);
let result = q.into_result();
match result.inner.info {
QueryInfo::Bootstrap { peer, remaining } => {
let local_key = self.kbuckets.local_key().clone();
let mut remaining = remaining.unwrap_or_else(|| {
debug_assert_eq!(&peer, local_key.preimage());
self.kbuckets.iter()
.skip_while(|b| b.is_empty())
.skip(1)
.map(|b| {
let mut target = kbucket::Key::from(PeerId::random());
for _ in 0 .. 16 {
let d = local_key.distance(&target);
if b.contains(&d) {
break;
}
target = kbucket::Key::from(PeerId::random());
}
target
}).collect::<Vec<_>>().into_iter()
});
let num_remaining = remaining.len().saturating_sub(1) as u32;
if let Some(target) = remaining.next() {
let info = QueryInfo::Bootstrap {
peer: target.clone().into_preimage(),
remaining: Some(remaining)
};
let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info);
self.queries.continue_iter_closest(query_id, target.clone(), peers, inner);
}
Some(KademliaEvent::QueryResult {
id: query_id,
stats: result.stats,
result: QueryResult::Bootstrap(Ok(BootstrapOk { peer, num_remaining }))
})
}
QueryInfo::GetClosestPeers { key, .. } => {
Some(KademliaEvent::QueryResult {
id: query_id,
stats: result.stats,
result: QueryResult::GetClosestPeers(Ok(
GetClosestPeersOk { key, peers: result.peers.collect() }
))
})
}
QueryInfo::GetProviders { key, providers } => {
Some(KademliaEvent::QueryResult {
id: query_id,
stats: result.stats,
result: QueryResult::GetProviders(Ok(
GetProvidersOk {
key,
providers,
closest_peers: result.peers.collect()
}
))
})
}
QueryInfo::AddProvider {
context,
key,
phase: AddProviderPhase::GetClosestPeers
} => {
let provider_id = *params.local_peer_id();
let external_addresses = params.external_addresses().map(|r| r.addr).collect();
let inner = QueryInner::new(QueryInfo::AddProvider {
context,
key,
phase: AddProviderPhase::AddProvider {
provider_id,
external_addresses,
get_closest_peers_stats: result.stats
}
});
self.queries.continue_fixed(query_id, result.peers, inner);
None
}
QueryInfo::AddProvider {
context,
key,
phase: AddProviderPhase::AddProvider { get_closest_peers_stats, .. }
} => {
match context {
AddProviderContext::Publish => {
Some(KademliaEvent::QueryResult {
id: query_id,
stats: get_closest_peers_stats.merge(result.stats),
result: QueryResult::StartProviding(Ok(AddProviderOk { key }))
})
}
AddProviderContext::Republish => {
Some(KademliaEvent::QueryResult {
id: query_id,
stats: get_closest_peers_stats.merge(result.stats),
result: QueryResult::RepublishProvider(Ok(AddProviderOk { key }))
})
}
}
}
QueryInfo::GetRecord { key, records, quorum, cache_at } => {
let results = if records.len() >= quorum.get() {
if let Some(cache_key) = cache_at {
let record = records.first().expect("[not empty]").record.clone();
let quorum = NonZeroUsize::new(1).expect("1 > 0");
let context = PutRecordContext::Cache;
let info = QueryInfo::PutRecord {
context,
record,
quorum,
phase: PutRecordPhase::PutRecord {
success: vec![],
get_closest_peers_stats: QueryStats::empty()
}
};
let inner = QueryInner::new(info);
self.queries.add_fixed(iter::once(cache_key.into_preimage()), inner);
}
Ok(GetRecordOk { records })
} else if records.is_empty() {
Err(GetRecordError::NotFound {
key,
closest_peers: result.peers.collect()
})
} else {
Err(GetRecordError::QuorumFailed { key, records, quorum })
};
Some(KademliaEvent::QueryResult {
id: query_id,
stats: result.stats,
result: QueryResult::GetRecord(results)
})
}
QueryInfo::PutRecord {
context,
record,
quorum,
phase: PutRecordPhase::GetClosestPeers
} => {
let info = QueryInfo::PutRecord {
context,
record,
quorum,
phase: PutRecordPhase::PutRecord {
success: vec![],
get_closest_peers_stats: result.stats
}
};
let inner = QueryInner::new(info);
self.queries.continue_fixed(query_id, result.peers, inner);
None
}
QueryInfo::PutRecord {
context,
record,
quorum,
phase: PutRecordPhase::PutRecord { success, get_closest_peers_stats }
} => {
let mk_result = |key: record::Key| {
if success.len() >= quorum.get() {
Ok(PutRecordOk { key })
} else {
Err(PutRecordError::QuorumFailed { key, quorum, success })
}
};
match context {
PutRecordContext::Publish =>
Some(KademliaEvent::QueryResult {
id: query_id,
stats: get_closest_peers_stats.merge(result.stats),
result: QueryResult::PutRecord(mk_result(record.key))
}),
PutRecordContext::Republish =>
Some(KademliaEvent::QueryResult {
id: query_id,
stats: get_closest_peers_stats.merge(result.stats),
result: QueryResult::RepublishRecord(mk_result(record.key))
}),
PutRecordContext::Replicate => {
debug!("Record replicated: {:?}", record.key);
None
}
PutRecordContext::Cache => {
debug!("Record cached: {:?}", record.key);
None
}
}
}
}
}
fn query_timeout(&mut self, query: Query<QueryInner>) -> Option<KademliaEvent> {
let query_id = query.id();
log::trace!("Query {:?} timed out.", query_id);
let result = query.into_result();
match result.inner.info {
QueryInfo::Bootstrap { peer, mut remaining } => {
let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
if let Some(mut remaining) = remaining.take() {
if let Some(target) = remaining.next() {
let info = QueryInfo::Bootstrap {
peer: target.clone().into_preimage(),
remaining: Some(remaining)
};
let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info);
self.queries.continue_iter_closest(query_id, target.clone(), peers, inner);
}
}
Some(KademliaEvent::QueryResult {
id: query_id,
stats: result.stats,
result: QueryResult::Bootstrap(Err(
BootstrapError::Timeout { peer, num_remaining }
))
})
}
QueryInfo::AddProvider { context, key, .. } =>
Some(match context {
AddProviderContext::Publish =>
KademliaEvent::QueryResult {
id: query_id,
stats: result.stats,
result: QueryResult::StartProviding(Err(
AddProviderError::Timeout { key }
))
},
AddProviderContext::Republish =>
KademliaEvent::QueryResult {
id: query_id,
stats: result.stats,
result: QueryResult::RepublishProvider(Err(
AddProviderError::Timeout { key }
))
}
}),
QueryInfo::GetClosestPeers { key } => {
Some(KademliaEvent::QueryResult {
id: query_id,
stats: result.stats,
result: QueryResult::GetClosestPeers(Err(
GetClosestPeersError::Timeout {
key,
peers: result.peers.collect()
}
))
})
},
QueryInfo::PutRecord { record, quorum, context, phase } => {
let err = Err(PutRecordError::Timeout {
key: record.key,
quorum,
success: match phase {
PutRecordPhase::GetClosestPeers => vec![],
PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
}
});
match context {
PutRecordContext::Publish =>
Some(KademliaEvent::QueryResult {
id: query_id,
stats: result.stats,
result: QueryResult::PutRecord(err)
}),
PutRecordContext::Republish =>
Some(KademliaEvent::QueryResult {
id: query_id,
stats: result.stats,
result: QueryResult::RepublishRecord(err)
}),
PutRecordContext::Replicate => match phase {
PutRecordPhase::GetClosestPeers => {
warn!("Locating closest peers for replication failed: {:?}", err);
None
}
PutRecordPhase::PutRecord { .. } => {
debug!("Replicating record failed: {:?}", err);
None
}
}
PutRecordContext::Cache => match phase {
PutRecordPhase::GetClosestPeers => {
unreachable!()
}
PutRecordPhase::PutRecord { .. } => {
debug!("Caching record failed: {:?}", err);
None
}
}
}
}
QueryInfo::GetRecord { key, records, quorum, .. } =>
Some(KademliaEvent::QueryResult {
id: query_id,
stats: result.stats,
result: QueryResult::GetRecord(Err(
GetRecordError::Timeout { key, records, quorum },
))
}),
QueryInfo::GetProviders { key, providers } =>
Some(KademliaEvent::QueryResult {
id: query_id,
stats: result.stats,
result: QueryResult::GetProviders(Err(
GetProvidersError::Timeout {
key,
providers,
closest_peers: result.peers.collect()
}
))
})
}
}
fn record_received(
&mut self,
source: PeerId,
connection: ConnectionId,
request_id: KademliaRequestId,
mut record: Record
) {
if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source,
handler: NotifyHandler::One(connection),
event: KademliaHandlerIn::PutRecordRes {
key: record.key,
value: record.value,
request_id,
},
});
return
}
let now = Instant::now();
let target = kbucket::Key::new(record.key.clone());
let num_between = self.kbuckets.count_nodes_between(&target);
let k = self.queries.config().replication_factor.get();
let num_beyond_k = (usize::max(k, num_between) - k) as u32;
let expiration = self.record_ttl.map(|ttl| now + exp_decrease(ttl, num_beyond_k));
record.expires = record.expires.or(expiration).min(expiration);
if let Some(job) = self.put_record_job.as_mut() {
job.skip(record.key.clone())
}
if !record.is_expired(now) {
match self.store.put(record.clone()) {
Ok(()) => debug!("Record stored: {:?}; {} bytes", record.key, record.value.len()),
Err(e) => {
info!("Record not stored: {:?}", e);
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source,
handler: NotifyHandler::One(connection),
event: KademliaHandlerIn::Reset(request_id)
});
return
}
}
}
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source,
handler: NotifyHandler::One(connection),
event: KademliaHandlerIn::PutRecordRes {
key: record.key,
value: record.value,
request_id,
},
})
}
fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
if &provider.node_id != self.kbuckets.local_key().preimage() {
let record = ProviderRecord {
key,
provider: provider.node_id,
expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
addresses: provider.multiaddrs,
};
if let Err(e) = self.store.add_provider(record) {
info!("Provider record not stored: {:?}", e);
}
}
}
}
fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
}
impl<TStore> NetworkBehaviour for Kademlia<TStore>
where
for<'a> TStore: RecordStore<'a>,
TStore: Send + 'static,
{
type ProtocolsHandler = KademliaHandlerProto<QueryId>;
type OutEvent = KademliaEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
KademliaHandlerProto::new(KademliaHandlerConfig {
protocol_config: self.protocol_config.clone(),
allow_listening: true,
idle_timeout: self.connection_idle_timeout,
})
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
let key = kbucket::Key::from(*peer_id);
let mut peer_addrs =
if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) {
let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
addrs
} else {
Vec::new()
};
for query in self.queries.iter() {
if let Some(addrs) = query.inner.addresses.get(peer_id) {
peer_addrs.extend(addrs.iter().cloned())
}
}
peer_addrs
}
fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {
}
fn inject_connected(&mut self, peer: &PeerId) {
for (peer_id, event) in self.queries.iter_mut().filter_map(|q|
q.inner.pending_rpcs.iter()
.position(|(p, _)| p == peer)
.map(|p| q.inner.pending_rpcs.remove(p)))
{
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id, event, handler: NotifyHandler::Any
});
}
self.connected_peers.insert(*peer);
}
fn inject_address_change(
&mut self,
peer: &PeerId,
_: &ConnectionId,
old: &ConnectedPoint,
new: &ConnectedPoint
) {
let (old, new) = (old.get_remote_address(), new.get_remote_address());
if let Some(addrs) = self.kbuckets.entry(&kbucket::Key::from(*peer)).value() {
if addrs.replace(old, new) {
debug!("Address '{}' replaced with '{}' for peer '{}'.", old, new, peer);
} else {
debug!(
"Address '{}' not replaced with '{}' for peer '{}' as old address wasn't \
present.",
old, new, peer,
);
}
} else {
debug!(
"Address '{}' not replaced with '{}' for peer '{}' as peer is not present in the \
routing table.",
old, new, peer,
);
}
for query in self.queries.iter_mut() {
if let Some(addrs) = query.inner.addresses.get_mut(peer) {
for addr in addrs.iter_mut() {
if addr == old {
*addr = new.clone();
}
}
}
}
}
fn inject_addr_reach_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
err: &dyn error::Error
) {
if let Some(peer_id) = peer_id {
let key = kbucket::Key::from(*peer_id);
if let Some(addrs) = self.kbuckets.entry(&key).value() {
if addrs.remove(addr).is_ok() {
debug!("Address '{}' removed from peer '{}' due to error: {}.",
addr, peer_id, err);
} else {
debug!("Last remaining address '{}' of peer '{}' is unreachable: {}.",
addr, peer_id, err)
}
}
for query in self.queries.iter_mut() {
if let Some(addrs) = query.inner.addresses.get_mut(peer_id) {
addrs.retain(|a| a != addr);
}
}
}
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
for query in self.queries.iter_mut() {
query.on_failure(peer_id);
}
}
fn inject_disconnected(&mut self, id: &PeerId) {
for query in self.queries.iter_mut() {
query.on_failure(id);
}
self.connection_updated(*id, None, NodeStatus::Disconnected);
self.connected_peers.remove(id);
}
fn inject_event(
&mut self,
source: PeerId,
connection: ConnectionId,
event: KademliaHandlerEvent<QueryId>
) {
match event {
KademliaHandlerEvent::ProtocolConfirmed { endpoint } => {
debug_assert!(self.connected_peers.contains(&source));
let address = match endpoint {
ConnectedPoint::Dialer { address } => Some(address),
ConnectedPoint::Listener { .. } => None,
};
self.connection_updated(source, address, NodeStatus::Connected);
}
KademliaHandlerEvent::FindNodeReq { key, request_id } => {
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source,
handler: NotifyHandler::One(connection),
event: KademliaHandlerIn::FindNodeRes {
closer_peers,
request_id,
},
});
}
KademliaHandlerEvent::FindNodeRes {
closer_peers,
user_data,
} => {
self.discovered(&user_data, &source, closer_peers.iter());
}
KademliaHandlerEvent::GetProvidersReq { key, request_id } => {
let provider_peers = self.provider_peers(&key, &source);
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source,
handler: NotifyHandler::One(connection),
event: KademliaHandlerIn::GetProvidersRes {
closer_peers,
provider_peers,
request_id,
},
});
}
KademliaHandlerEvent::GetProvidersRes {
closer_peers,
provider_peers,
user_data,
} => {
let peers = closer_peers.iter().chain(provider_peers.iter());
self.discovered(&user_data, &source, peers);
if let Some(query) = self.queries.get_mut(&user_data) {
if let QueryInfo::GetProviders {
providers, ..
} = &mut query.inner.info {
for peer in provider_peers {
providers.insert(peer.node_id);
}
}
}
}
KademliaHandlerEvent::QueryError { user_data, error } => {
log::debug!("Request to {:?} in query {:?} failed with {:?}",
source, user_data, error);
if let Some(query) = self.queries.get_mut(&user_data) {
query.on_failure(&source)
}
}
KademliaHandlerEvent::AddProvider { key, provider } => {
if provider.node_id != source {
return
}
self.provider_received(key, provider)
}
KademliaHandlerEvent::GetRecord { key, request_id } => {
let record = match self.store.get(&key) {
Some(record) => {
if record.is_expired(Instant::now()) {
self.store.remove(&key);
None
} else {
Some(record.into_owned())
}
},
None => None
};
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source,
handler: NotifyHandler::One(connection),
event: KademliaHandlerIn::GetRecordRes {
record,
closer_peers,
request_id,
},
});
}
KademliaHandlerEvent::GetRecordRes {
record,
closer_peers,
user_data,
} => {
if let Some(query) = self.queries.get_mut(&user_data) {
if let QueryInfo::GetRecord {
key, records, quorum, cache_at
} = &mut query.inner.info {
if let Some(record) = record {
records.push(PeerRecord{ peer: Some(source), record });
let quorum = quorum.get();
if records.len() >= quorum {
let peers = records.iter()
.filter_map(|PeerRecord{ peer, .. }| peer.as_ref())
.cloned()
.collect::<Vec<_>>();
let finished = query.try_finish(peers.iter());
if !finished {
debug!(
"GetRecord query ({:?}) reached quorum ({}/{}) with \
response from peer {} but could not yet finish.",
user_data, peers.len(), quorum, source,
);
}
}
} else if quorum.get() == 1 {
let source_key = kbucket::Key::from(source);
if let Some(cache_key) = cache_at {
let key = kbucket::Key::new(key.clone());
if source_key.distance(&key) < cache_key.distance(&key) {
*cache_at = Some(source_key)
}
} else {
*cache_at = Some(source_key)
}
}
}
}
self.discovered(&user_data, &source, closer_peers.iter());
}
KademliaHandlerEvent::PutRecord {
record,
request_id
} => {
self.record_received(source, connection, request_id, record);
}
KademliaHandlerEvent::PutRecordRes {
user_data, ..
} => {
if let Some(query) = self.queries.get_mut(&user_data) {
query.on_success(&source, vec![]);
if let QueryInfo::PutRecord {
phase: PutRecordPhase::PutRecord { success, .. }, quorum, ..
} = &mut query.inner.info {
success.push(source);
let quorum = quorum.get();
if success.len() >= quorum {
let peers = success.clone();
let finished = query.try_finish(peers.iter());
if !finished {
debug!(
"PutRecord query ({:?}) reached quorum ({}/{}) with response \
from peer {} but could not yet finish.",
user_data, peers.len(), quorum, source,
);
}
}
}
}
}
};
}
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
self.local_addrs.insert(addr.clone());
}
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
self.local_addrs.remove(addr);
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
if self.local_addrs.len() < MAX_LOCAL_EXTERNAL_ADDRS {
self.local_addrs.insert(addr.clone());
}
}
fn poll(&mut self, cx: &mut Context<'_>, parameters: &mut impl PollParameters) -> Poll<
NetworkBehaviourAction<
KademliaHandlerIn<QueryId>,
Self::OutEvent,
>,
> {
let now = Instant::now();
let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
if let Some(mut job) = self.add_provider_job.take() {
let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
for _ in 0 .. num {
if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
self.start_add_provider(r.key, AddProviderContext::Republish)
} else {
break
}
}
jobs_query_capacity -= num;
self.add_provider_job = Some(job);
}
if let Some(mut job) = self.put_record_job.take() {
let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
for _ in 0 .. num {
if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
let context = if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
PutRecordContext::Republish
} else {
PutRecordContext::Replicate
};
self.start_put_record(r, Quorum::All, context)
} else {
break
}
}
self.put_record_job = Some(job);
}
loop {
if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(event);
}
if let Some(entry) = self.kbuckets.take_applied_pending() {
let kbucket::Node { key, value } = entry.inserted;
let event = KademliaEvent::RoutingUpdated {
peer: key.into_preimage(),
addresses: value,
old_peer: entry.evicted.map(|n| n.key.into_preimage())
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
}
loop {
match self.queries.poll(now) {
QueryPoolState::Finished(q) => {
if let Some(event) = self.query_finished(q, parameters) {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
}
}
QueryPoolState::Timeout(q) => {
if let Some(event) = self.query_timeout(q) {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
}
}
QueryPoolState::Waiting(Some((query, peer_id))) => {
let event = query.inner.info.to_request(query.id());
if let QueryInfo::AddProvider {
phase: AddProviderPhase::AddProvider { .. },
..
} = &query.inner.info {
query.on_success(&peer_id, vec![])
}
if self.connected_peers.contains(&peer_id) {
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id, event, handler: NotifyHandler::Any
});
} else if &peer_id != self.kbuckets.local_key().preimage() {
query.inner.pending_rpcs.push((peer_id, event));
self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
peer_id, condition: DialPeerCondition::Disconnected
});
}
}
QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
}
}
if self.queued_events.is_empty() {
return Poll::Pending
}
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Quorum {
One,
Majority,
All,
N(NonZeroUsize)
}
impl Quorum {
fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
match self {
Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
Quorum::All => total,
Quorum::N(n) => NonZeroUsize::min(total, *n)
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerRecord {
pub peer: Option<PeerId>,
pub record: Record,
}
#[derive(Debug)]
pub enum KademliaEvent {
QueryResult {
id: QueryId,
result: QueryResult,
stats: QueryStats
},
RoutingUpdated {
peer: PeerId,
addresses: Addresses,
old_peer: Option<PeerId>,
},
UnroutablePeer {
peer: PeerId
},
RoutablePeer {
peer: PeerId,
address: Multiaddr,
},
PendingRoutablePeer {
peer: PeerId,
address: Multiaddr,
}
}
#[derive(Debug)]
pub enum QueryResult {
Bootstrap(BootstrapResult),
GetClosestPeers(GetClosestPeersResult),
GetProviders(GetProvidersResult),
StartProviding(AddProviderResult),
RepublishProvider(AddProviderResult),
GetRecord(GetRecordResult),
PutRecord(PutRecordResult),
RepublishRecord(PutRecordResult),
}
pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
#[derive(Debug, Clone)]
pub struct GetRecordOk {
pub records: Vec<PeerRecord>
}
#[derive(Debug, Clone)]
pub enum GetRecordError {
NotFound {
key: record::Key,
closest_peers: Vec<PeerId>
},
QuorumFailed {
key: record::Key,
records: Vec<PeerRecord>,
quorum: NonZeroUsize
},
Timeout {
key: record::Key,
records: Vec<PeerRecord>,
quorum: NonZeroUsize
}
}
impl GetRecordError {
pub fn key(&self) -> &record::Key {
match self {
GetRecordError::QuorumFailed { key, .. } => key,
GetRecordError::Timeout { key, .. } => key,
GetRecordError::NotFound { key, .. } => key,
}
}
pub fn into_key(self) -> record::Key {
match self {
GetRecordError::QuorumFailed { key, .. } => key,
GetRecordError::Timeout { key, .. } => key,
GetRecordError::NotFound { key, .. } => key,
}
}
}
pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
#[derive(Debug, Clone)]
pub struct PutRecordOk {
pub key: record::Key
}
#[derive(Debug)]
pub enum PutRecordError {
QuorumFailed {
key: record::Key,
success: Vec<PeerId>,
quorum: NonZeroUsize
},
Timeout {
key: record::Key,
success: Vec<PeerId>,
quorum: NonZeroUsize
},
}
impl PutRecordError {
pub fn key(&self) -> &record::Key {
match self {
PutRecordError::QuorumFailed { key, .. } => key,
PutRecordError::Timeout { key, .. } => key,
}
}
pub fn into_key(self) -> record::Key {
match self {
PutRecordError::QuorumFailed { key, .. } => key,
PutRecordError::Timeout { key, .. } => key,
}
}
}
pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
#[derive(Debug, Clone)]
pub struct BootstrapOk {
pub peer: PeerId,
pub num_remaining: u32,
}
#[derive(Debug, Clone)]
pub enum BootstrapError {
Timeout {
peer: PeerId,
num_remaining: Option<u32>,
}
}
pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
#[derive(Debug, Clone)]
pub struct GetClosestPeersOk {
pub key: Vec<u8>,
pub peers: Vec<PeerId>
}
#[derive(Debug, Clone)]
pub enum GetClosestPeersError {
Timeout {
key: Vec<u8>,
peers: Vec<PeerId>
}
}
impl GetClosestPeersError {
pub fn key(&self) -> &Vec<u8> {
match self {
GetClosestPeersError::Timeout { key, .. } => key,
}
}
pub fn into_key(self) -> Vec<u8> {
match self {
GetClosestPeersError::Timeout { key, .. } => key,
}
}
}
pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
#[derive(Debug, Clone)]
pub struct GetProvidersOk {
pub key: record::Key,
pub providers: HashSet<PeerId>,
pub closest_peers: Vec<PeerId>
}
#[derive(Debug, Clone)]
pub enum GetProvidersError {
Timeout {
key: record::Key,
providers: HashSet<PeerId>,
closest_peers: Vec<PeerId>
}
}
impl GetProvidersError {
pub fn key(&self) -> &record::Key {
match self {
GetProvidersError::Timeout { key, .. } => key,
}
}
pub fn into_key(self) -> record::Key {
match self {
GetProvidersError::Timeout { key, .. } => key,
}
}
}
pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
#[derive(Debug, Clone)]
pub struct AddProviderOk {
pub key: record::Key,
}
#[derive(Debug)]
pub enum AddProviderError {
Timeout {
key: record::Key,
},
}
impl AddProviderError {
pub fn key(&self) -> &record::Key {
match self {
AddProviderError::Timeout { key, .. } => key,
}
}
pub fn into_key(self) -> record::Key {
match self {
AddProviderError::Timeout { key, .. } => key,
}
}
}
impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
KadPeer {
node_id: e.node.key.into_preimage(),
multiaddrs: e.node.value.into_vec(),
connection_ty: match e.status {
NodeStatus::Connected => KadConnectionType::Connected,
NodeStatus::Disconnected => KadConnectionType::NotConnected
}
}
}
}
struct QueryInner {
info: QueryInfo,
addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
pending_rpcs: SmallVec<[(PeerId, KademliaHandlerIn<QueryId>); K_VALUE.get()]>
}
impl QueryInner {
fn new(info: QueryInfo) -> Self {
QueryInner {
info,
addresses: Default::default(),
pending_rpcs: SmallVec::default()
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum AddProviderContext {
Publish,
Republish,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum PutRecordContext {
Publish,
Republish,
Replicate,
Cache,
}
#[derive(Debug, Clone)]
pub enum QueryInfo {
Bootstrap {
peer: PeerId,
remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>
},
GetClosestPeers { key: Vec<u8> },
GetProviders {
key: record::Key,
providers: HashSet<PeerId>,
},
AddProvider {
key: record::Key,
phase: AddProviderPhase,
context: AddProviderContext,
},
PutRecord {
record: Record,
quorum: NonZeroUsize,
phase: PutRecordPhase,
context: PutRecordContext,
},
GetRecord {
key: record::Key,
records: Vec<PeerRecord>,
quorum: NonZeroUsize,
cache_at: Option<kbucket::Key<PeerId>>,
},
}
impl QueryInfo {
fn to_request(&self, query_id: QueryId) -> KademliaHandlerIn<QueryId> {
match &self {
QueryInfo::Bootstrap { peer, .. } => KademliaHandlerIn::FindNodeReq {
key: peer.to_bytes(),
user_data: query_id,
},
QueryInfo::GetClosestPeers { key, .. } => KademliaHandlerIn::FindNodeReq {
key: key.clone(),
user_data: query_id,
},
QueryInfo::GetProviders { key, .. } => KademliaHandlerIn::GetProvidersReq {
key: key.clone(),
user_data: query_id,
},
QueryInfo::AddProvider { key, phase, .. } => match phase {
AddProviderPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq {
key: key.to_vec(),
user_data: query_id,
},
AddProviderPhase::AddProvider { provider_id, external_addresses, .. } => {
KademliaHandlerIn::AddProvider {
key: key.clone(),
provider: crate::protocol::KadPeer {
node_id: *provider_id,
multiaddrs: external_addresses.clone(),
connection_ty: crate::protocol::KadConnectionType::Connected,
}
}
}
},
QueryInfo::GetRecord { key, .. } => KademliaHandlerIn::GetRecord {
key: key.clone(),
user_data: query_id,
},
QueryInfo::PutRecord { record, phase, .. } => match phase {
PutRecordPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq {
key: record.key.to_vec(),
user_data: query_id,
},
PutRecordPhase::PutRecord { .. } => KademliaHandlerIn::PutRecord {
record: record.clone(),
user_data: query_id
}
}
}
}
}
#[derive(Debug, Clone)]
pub enum AddProviderPhase {
GetClosestPeers,
AddProvider {
provider_id: PeerId,
external_addresses: Vec<Multiaddr>,
get_closest_peers_stats: QueryStats,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PutRecordPhase {
GetClosestPeers,
PutRecord {
success: Vec<PeerId>,
get_closest_peers_stats: QueryStats,
},
}
pub struct QueryMut<'a> {
query: &'a mut Query<QueryInner>,
}
impl<'a> QueryMut<'a> {
pub fn id(&self) -> QueryId {
self.query.id()
}
pub fn info(&self) -> &QueryInfo {
&self.query.inner.info
}
pub fn stats(&self) -> &QueryStats {
self.query.stats()
}
pub fn finish(&mut self) {
self.query.finish()
}
}
pub struct QueryRef<'a> {
query: &'a Query<QueryInner>,
}
impl<'a> QueryRef<'a> {
pub fn id(&self) -> QueryId {
self.query.id()
}
pub fn info(&self) -> &QueryInfo {
&self.query.inner.info
}
pub fn stats(&self) -> &QueryStats {
self.query.stats()
}
}
#[derive(Debug, Clone)]
pub struct NoKnownPeers();
impl fmt::Display for NoKnownPeers {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "No known peers.")
}
}
impl std::error::Error for NoKnownPeers {}
pub enum RoutingUpdate {
Success,
Pending,
Failed,
}
const MAX_LOCAL_EXTERNAL_ADDRS: usize = 20;