1mod peers;
22
23use std::{num::NonZeroUsize, time::Duration};
24
25use either::Either;
26use fnv::FnvHashMap;
27use libp2p_core::Multiaddr;
28use libp2p_identity::PeerId;
29use peers::{
30 closest::{disjoint::ClosestDisjointPeersIter, ClosestPeersIter, ClosestPeersIterConfig},
31 fixed::FixedPeersIter,
32 PeersIterState,
33};
34use smallvec::SmallVec;
35use web_time::Instant;
36
37use crate::{
38 behaviour::PeerInfo,
39 handler::HandlerIn,
40 kbucket::{Key, KeyBytes},
41 QueryInfo, ALPHA_VALUE, K_VALUE,
42};
43
44pub(crate) struct QueryPool {
50 next_id: usize,
51 config: QueryConfig,
52 queries: FnvHashMap<QueryId, Query>,
53}
54
55pub(crate) enum QueryPoolState<'a> {
57 Idle,
59 Waiting(Option<(&'a mut Query, PeerId)>),
62 Finished(Query),
64 Timeout(Query),
66}
67
68impl QueryPool {
69 pub(crate) fn new(config: QueryConfig) -> Self {
71 QueryPool {
72 next_id: 0,
73 config,
74 queries: Default::default(),
75 }
76 }
77
78 pub(crate) fn config(&self) -> &QueryConfig {
80 &self.config
81 }
82
83 pub(crate) fn iter(&self) -> impl Iterator<Item = &Query> {
85 self.queries.values()
86 }
87
88 pub(crate) fn size(&self) -> usize {
90 self.queries.len()
91 }
92
93 pub(crate) fn iter_mut(&mut self) -> impl Iterator<Item = &mut Query> {
95 self.queries.values_mut()
96 }
97
98 pub(crate) fn add_fixed<I>(&mut self, peers: I, info: QueryInfo) -> QueryId
100 where
101 I: IntoIterator<Item = PeerId>,
102 {
103 let id = self.next_query_id();
104 self.continue_fixed(id, peers, info);
105 id
106 }
107
108 pub(crate) fn continue_fixed<I>(&mut self, id: QueryId, peers: I, info: QueryInfo)
112 where
113 I: IntoIterator<Item = PeerId>,
114 {
115 assert!(!self.queries.contains_key(&id));
116 let parallelism = self.config.replication_factor;
117 let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism));
118 let query = Query::new(id, peer_iter, info);
119 self.queries.insert(id, query);
120 }
121
122 pub(crate) fn add_iter_closest<T, I>(&mut self, target: T, peers: I, info: QueryInfo) -> QueryId
124 where
125 T: Into<KeyBytes> + Clone,
126 I: IntoIterator<Item = Key<PeerId>>,
127 {
128 let id = self.next_query_id();
129 self.continue_iter_closest(id, target, peers, info);
130 id
131 }
132
133 pub(crate) fn continue_iter_closest<T, I>(
135 &mut self,
136 id: QueryId,
137 target: T,
138 peers: I,
139 info: QueryInfo,
140 ) where
141 T: Into<KeyBytes> + Clone,
142 I: IntoIterator<Item = Key<PeerId>>,
143 {
144 let num_results = match info {
145 QueryInfo::GetClosestPeers {
146 num_results: Some(val),
147 ..
148 } => val,
149 _ => self.config.replication_factor,
150 };
151
152 let cfg = ClosestPeersIterConfig {
153 num_results,
154 parallelism: self.config.parallelism,
155 ..ClosestPeersIterConfig::default()
156 };
157
158 let peer_iter = if self.config.disjoint_query_paths {
159 QueryPeerIter::ClosestDisjoint(ClosestDisjointPeersIter::with_config(
160 cfg, target, peers,
161 ))
162 } else {
163 QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers))
164 };
165
166 let query = Query::new(id, peer_iter, info);
167 self.queries.insert(id, query);
168 }
169
170 fn next_query_id(&mut self) -> QueryId {
171 let id = QueryId(self.next_id);
172 self.next_id = self.next_id.wrapping_add(1);
173 id
174 }
175
176 pub(crate) fn get(&self, id: &QueryId) -> Option<&Query> {
178 self.queries.get(id)
179 }
180
181 pub(crate) fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query> {
183 self.queries.get_mut(id)
184 }
185
186 pub(crate) fn poll(&mut self, now: Instant) -> QueryPoolState<'_> {
188 let mut finished = None;
189 let mut timeout = None;
190 let mut waiting = None;
191
192 for (&query_id, query) in self.queries.iter_mut() {
193 query.stats.start = query.stats.start.or(Some(now));
194 match query.next(now) {
195 PeersIterState::Finished => {
196 finished = Some(query_id);
197 break;
198 }
199 PeersIterState::Waiting(Some(peer_id)) => {
200 let peer = peer_id.into_owned();
201 waiting = Some((query_id, peer));
202 break;
203 }
204 PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
205 let elapsed = now - query.stats.start.unwrap_or(now);
206 if elapsed >= self.config.timeout {
207 timeout = Some(query_id);
208 break;
209 }
210 }
211 }
212 }
213
214 if let Some((query_id, peer_id)) = waiting {
215 let query = self.queries.get_mut(&query_id).expect("s.a.");
216 return QueryPoolState::Waiting(Some((query, peer_id)));
217 }
218
219 if let Some(query_id) = finished {
220 let mut query = self.queries.remove(&query_id).expect("s.a.");
221 query.stats.end = Some(now);
222 return QueryPoolState::Finished(query);
223 }
224
225 if let Some(query_id) = timeout {
226 let mut query = self.queries.remove(&query_id).expect("s.a.");
227 query.stats.end = Some(now);
228 return QueryPoolState::Timeout(query);
229 }
230
231 if self.queries.is_empty() {
232 QueryPoolState::Idle
233 } else {
234 QueryPoolState::Waiting(None)
235 }
236 }
237}
238
239#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
241pub struct QueryId(usize);
242
243impl std::fmt::Display for QueryId {
244 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245 write!(f, "{}", self.0)
246 }
247}
248
249#[derive(Debug, Clone)]
251pub(crate) struct QueryConfig {
252 pub(crate) timeout: Duration,
256 pub(crate) replication_factor: NonZeroUsize,
260 pub(crate) parallelism: NonZeroUsize,
264 pub(crate) disjoint_query_paths: bool,
268}
269
270impl Default for QueryConfig {
271 fn default() -> Self {
272 QueryConfig {
273 timeout: Duration::from_secs(60),
274 replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"),
275 parallelism: ALPHA_VALUE,
276 disjoint_query_paths: false,
277 }
278 }
279}
280
281pub(crate) struct Query {
283 id: QueryId,
285 pub(crate) peers: QueryPeers,
287 pub(crate) stats: QueryStats,
289 pub(crate) info: QueryInfo,
291 pub(crate) pending_rpcs: SmallVec<[(PeerId, HandlerIn); K_VALUE.get()]>,
296}
297
298pub(crate) struct QueryPeers {
300 pub(crate) addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
302 peer_iter: QueryPeerIter,
304}
305
306impl QueryPeers {
307 pub(crate) fn into_peerids_iter(self) -> impl Iterator<Item = PeerId> {
309 match self.peer_iter {
310 QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
311 QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
312 QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
313 }
314 }
315
316 pub(crate) fn into_peerinfos_iter(mut self) -> impl Iterator<Item = PeerInfo> {
319 match self.peer_iter {
320 QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
321 QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
322 QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
323 }
324 .map(move |peer_id| {
325 let addrs = self.addresses.remove(&peer_id).unwrap_or_default().to_vec();
326 PeerInfo { peer_id, addrs }
327 })
328 }
329}
330
331enum QueryPeerIter {
333 Closest(ClosestPeersIter),
334 ClosestDisjoint(ClosestDisjointPeersIter),
335 Fixed(FixedPeersIter),
336}
337
338impl Query {
339 fn new(id: QueryId, peer_iter: QueryPeerIter, info: QueryInfo) -> Self {
341 Query {
342 id,
343 info,
344 peers: QueryPeers {
345 addresses: Default::default(),
346 peer_iter,
347 },
348 pending_rpcs: SmallVec::default(),
349 stats: QueryStats::empty(),
350 }
351 }
352
353 pub(crate) fn id(&self) -> QueryId {
355 self.id
356 }
357
358 pub(crate) fn stats(&self) -> &QueryStats {
360 &self.stats
361 }
362
363 pub(crate) fn on_failure(&mut self, peer: &PeerId) {
365 let updated = match &mut self.peers.peer_iter {
366 QueryPeerIter::Closest(iter) => iter.on_failure(peer),
367 QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
368 QueryPeerIter::Fixed(iter) => iter.on_failure(peer),
369 };
370 if updated {
371 self.stats.failure += 1;
372 }
373 }
374
375 pub(crate) fn on_success<I>(&mut self, peer: &PeerId, new_peers: I)
379 where
380 I: IntoIterator<Item = PeerId>,
381 {
382 let updated = match &mut self.peers.peer_iter {
383 QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers),
384 QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, new_peers),
385 QueryPeerIter::Fixed(iter) => iter.on_success(peer),
386 };
387 if updated {
388 self.stats.success += 1;
389 }
390 }
391
392 fn next(&mut self, now: Instant) -> PeersIterState<'_> {
394 let state = match &mut self.peers.peer_iter {
395 QueryPeerIter::Closest(iter) => iter.next(now),
396 QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
397 QueryPeerIter::Fixed(iter) => iter.next(),
398 };
399
400 if let PeersIterState::Waiting(Some(_)) = state {
401 self.stats.requests += 1;
402 }
403
404 state
405 }
406
407 pub(crate) fn try_finish<'a, I>(&mut self, peers: I) -> bool
425 where
426 I: IntoIterator<Item = &'a PeerId>,
427 {
428 match &mut self.peers.peer_iter {
429 QueryPeerIter::Closest(iter) => {
430 iter.finish();
431 true
432 }
433 QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers),
434 QueryPeerIter::Fixed(iter) => {
435 iter.finish();
436 true
437 }
438 }
439 }
440
441 pub(crate) fn finish(&mut self) {
446 match &mut self.peers.peer_iter {
447 QueryPeerIter::Closest(iter) => iter.finish(),
448 QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
449 QueryPeerIter::Fixed(iter) => iter.finish(),
450 }
451 }
452
453 pub(crate) fn is_finished(&self) -> bool {
458 match &self.peers.peer_iter {
459 QueryPeerIter::Closest(iter) => iter.is_finished(),
460 QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
461 QueryPeerIter::Fixed(iter) => iter.is_finished(),
462 }
463 }
464}
465
466#[derive(Clone, Debug, PartialEq, Eq)]
468pub struct QueryStats {
469 requests: u32,
470 success: u32,
471 failure: u32,
472 start: Option<Instant>,
473 end: Option<Instant>,
474}
475
476impl QueryStats {
477 pub fn empty() -> Self {
478 QueryStats {
479 requests: 0,
480 success: 0,
481 failure: 0,
482 start: None,
483 end: None,
484 }
485 }
486
487 pub fn num_requests(&self) -> u32 {
489 self.requests
490 }
491
492 pub fn num_successes(&self) -> u32 {
494 self.success
495 }
496
497 pub fn num_failures(&self) -> u32 {
499 self.failure
500 }
501
502 pub fn num_pending(&self) -> u32 {
507 self.requests - (self.success + self.failure)
508 }
509
510 pub fn duration(&self) -> Option<Duration> {
518 if let Some(s) = self.start {
519 if let Some(e) = self.end {
520 Some(e - s)
521 } else {
522 Some(Instant::now() - s)
523 }
524 } else {
525 None
526 }
527 }
528
529 pub fn merge(self, other: QueryStats) -> Self {
536 QueryStats {
537 requests: self.requests + other.requests,
538 success: self.success + other.success,
539 failure: self.failure + other.failure,
540 start: match (self.start, other.start) {
541 (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
542 (a, b) => a.or(b),
543 },
544 end: std::cmp::max(self.end, other.end),
545 }
546 }
547}