mod peers;
use peers::PeersIterState;
use peers::closest::{ClosestPeersIterConfig, ClosestPeersIter, disjoint::ClosestDisjointPeersIter};
use peers::fixed::FixedPeersIter;
use crate::{ALPHA_VALUE, K_VALUE};
use crate::kbucket::{Key, KeyBytes};
use either::Either;
use fnv::FnvHashMap;
use libp2p_core::PeerId;
use std::{time::Duration, num::NonZeroUsize};
use wasm_timer::Instant;
pub struct QueryPool<TInner> {
next_id: usize,
config: QueryConfig,
queries: FnvHashMap<QueryId, Query<TInner>>,
}
pub enum QueryPoolState<'a, TInner> {
Idle,
Waiting(Option<(&'a mut Query<TInner>, PeerId)>),
Finished(Query<TInner>),
Timeout(Query<TInner>)
}
impl<TInner> QueryPool<TInner> {
pub fn new(config: QueryConfig) -> Self {
QueryPool {
next_id: 0,
config,
queries: Default::default()
}
}
pub fn config(&self) -> &QueryConfig {
&self.config
}
pub fn iter(&self) -> impl Iterator<Item = &Query<TInner>> {
self.queries.values()
}
pub fn size(&self) -> usize {
self.queries.len()
}
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut Query<TInner>> {
self.queries.values_mut()
}
pub fn add_fixed<I>(&mut self, peers: I, inner: TInner) -> QueryId
where
I: IntoIterator<Item = PeerId>
{
let id = self.next_query_id();
self.continue_fixed(id, peers, inner);
id
}
pub fn continue_fixed<I>(&mut self, id: QueryId, peers: I, inner: TInner)
where
I: IntoIterator<Item = PeerId>
{
assert!(!self.queries.contains_key(&id));
let parallelism = self.config.replication_factor;
let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism));
let query = Query::new(id, peer_iter, inner);
self.queries.insert(id, query);
}
pub fn add_iter_closest<T, I>(&mut self, target: T, peers: I, inner: TInner) -> QueryId
where
T: Into<KeyBytes> + Clone,
I: IntoIterator<Item = Key<PeerId>>
{
let id = self.next_query_id();
self.continue_iter_closest(id, target, peers, inner);
id
}
pub fn continue_iter_closest<T, I>(&mut self, id: QueryId, target: T, peers: I, inner: TInner)
where
T: Into<KeyBytes> + Clone,
I: IntoIterator<Item = Key<PeerId>>
{
let cfg = ClosestPeersIterConfig {
num_results: self.config.replication_factor,
parallelism: self.config.parallelism,
.. ClosestPeersIterConfig::default()
};
let peer_iter = if self.config.disjoint_query_paths {
QueryPeerIter::ClosestDisjoint(
ClosestDisjointPeersIter::with_config(cfg, target, peers),
)
} else {
QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers))
};
let query = Query::new(id, peer_iter, inner);
self.queries.insert(id, query);
}
fn next_query_id(&mut self) -> QueryId {
let id = QueryId(self.next_id);
self.next_id = self.next_id.wrapping_add(1);
id
}
pub fn get(&self, id: &QueryId) -> Option<&Query<TInner>> {
self.queries.get(id)
}
pub fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query<TInner>> {
self.queries.get_mut(id)
}
pub fn poll(&mut self, now: Instant) -> QueryPoolState<'_, TInner> {
let mut finished = None;
let mut timeout = None;
let mut waiting = None;
for (&query_id, query) in self.queries.iter_mut() {
query.stats.start = query.stats.start.or(Some(now));
match query.next(now) {
PeersIterState::Finished => {
finished = Some(query_id);
break
}
PeersIterState::Waiting(Some(peer_id)) => {
let peer = peer_id.into_owned();
waiting = Some((query_id, peer));
break
}
PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
let elapsed = now - query.stats.start.unwrap_or(now);
if elapsed >= self.config.timeout {
timeout = Some(query_id);
break
}
}
}
}
if let Some((query_id, peer_id)) = waiting {
let query = self.queries.get_mut(&query_id).expect("s.a.");
return QueryPoolState::Waiting(Some((query, peer_id)))
}
if let Some(query_id) = finished {
let mut query = self.queries.remove(&query_id).expect("s.a.");
query.stats.end = Some(now);
return QueryPoolState::Finished(query)
}
if let Some(query_id) = timeout {
let mut query = self.queries.remove(&query_id).expect("s.a.");
query.stats.end = Some(now);
return QueryPoolState::Timeout(query)
}
if self.queries.is_empty() {
return QueryPoolState::Idle
} else {
return QueryPoolState::Waiting(None)
}
}
}
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub struct QueryId(usize);
#[derive(Debug, Clone)]
pub struct QueryConfig {
pub timeout: Duration,
pub replication_factor: NonZeroUsize,
pub parallelism: NonZeroUsize,
pub disjoint_query_paths: bool,
}
impl Default for QueryConfig {
fn default() -> Self {
QueryConfig {
timeout: Duration::from_secs(60),
replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"),
parallelism: ALPHA_VALUE,
disjoint_query_paths: false,
}
}
}
pub struct Query<TInner> {
id: QueryId,
peer_iter: QueryPeerIter,
stats: QueryStats,
pub inner: TInner,
}
enum QueryPeerIter {
Closest(ClosestPeersIter),
ClosestDisjoint(ClosestDisjointPeersIter),
Fixed(FixedPeersIter)
}
impl<TInner> Query<TInner> {
fn new(id: QueryId, peer_iter: QueryPeerIter, inner: TInner) -> Self {
Query { id, inner, peer_iter, stats: QueryStats::empty() }
}
pub fn id(&self) -> QueryId {
self.id
}
pub fn stats(&self) -> &QueryStats {
&self.stats
}
pub fn on_failure(&mut self, peer: &PeerId) {
let updated = match &mut self.peer_iter {
QueryPeerIter::Closest(iter) => iter.on_failure(peer),
QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
QueryPeerIter::Fixed(iter) => iter.on_failure(peer)
};
if updated {
self.stats.failure += 1;
}
}
pub fn on_success<I>(&mut self, peer: &PeerId, new_peers: I)
where
I: IntoIterator<Item = PeerId>
{
let updated = match &mut self.peer_iter {
QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers),
QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, new_peers),
QueryPeerIter::Fixed(iter) => iter.on_success(peer)
};
if updated {
self.stats.success += 1;
}
}
pub fn is_waiting(&self, peer: &PeerId) -> bool {
match &self.peer_iter {
QueryPeerIter::Closest(iter) => iter.is_waiting(peer),
QueryPeerIter::ClosestDisjoint(iter) => iter.is_waiting(peer),
QueryPeerIter::Fixed(iter) => iter.is_waiting(peer)
}
}
fn next(&mut self, now: Instant) -> PeersIterState<'_> {
let state = match &mut self.peer_iter {
QueryPeerIter::Closest(iter) => iter.next(now),
QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
QueryPeerIter::Fixed(iter) => iter.next()
};
if let PeersIterState::Waiting(Some(_)) = state {
self.stats.requests += 1;
}
state
}
pub fn try_finish<'a, I>(&mut self, peers: I) -> bool
where
I: IntoIterator<Item = &'a PeerId>
{
match &mut self.peer_iter {
QueryPeerIter::Closest(iter) => { iter.finish(); true },
QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers),
QueryPeerIter::Fixed(iter) => { iter.finish(); true }
}
}
pub fn finish(&mut self) {
match &mut self.peer_iter {
QueryPeerIter::Closest(iter) => iter.finish(),
QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
QueryPeerIter::Fixed(iter) => iter.finish()
}
}
pub fn is_finished(&self) -> bool {
match &self.peer_iter {
QueryPeerIter::Closest(iter) => iter.is_finished(),
QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
QueryPeerIter::Fixed(iter) => iter.is_finished()
}
}
pub fn into_result(self) -> QueryResult<TInner, impl Iterator<Item = PeerId>> {
let peers = match self.peer_iter {
QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result())
};
QueryResult { peers, inner: self.inner, stats: self.stats }
}
}
pub struct QueryResult<TInner, TPeers> {
pub inner: TInner,
pub peers: TPeers,
pub stats: QueryStats
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct QueryStats {
requests: u32,
success: u32,
failure: u32,
start: Option<Instant>,
end: Option<Instant>
}
impl QueryStats {
pub fn empty() -> Self {
QueryStats {
requests: 0,
success: 0,
failure: 0,
start: None,
end: None,
}
}
pub fn num_requests(&self) -> u32 {
self.requests
}
pub fn num_successes(&self) -> u32 {
self.success
}
pub fn num_failures(&self) -> u32 {
self.failure
}
pub fn num_pending(&self) -> u32 {
self.requests - (self.success + self.failure)
}
pub fn duration(&self) -> Option<Duration> {
if let Some(s) = self.start {
if let Some(e) = self.end {
Some(e - s)
} else {
Some(Instant::now() - s)
}
} else {
None
}
}
pub fn merge(self, other: QueryStats) -> Self {
QueryStats {
requests: self.requests + other.requests,
success: self.success + other.success,
failure: self.failure + other.failure,
start: match (self.start, other.start) {
(Some(a), Some(b)) => Some(std::cmp::min(a, b)),
(a, b) => a.or(b)
},
end: std::cmp::max(self.end, other.end)
}
}
}