1use crate::LOG_TARGET;
20use log::trace;
21use sc_network_common::sync::message;
22use sc_network_types::PeerId;
23use sp_runtime::traits::{Block as BlockT, NumberFor, One};
24use std::{
25 cmp,
26 collections::{BTreeMap, HashMap},
27 ops::Range,
28};
29
30#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct BlockData<B: BlockT> {
33 pub block: message::BlockData<B>,
35 pub origin: Option<PeerId>,
37}
38
39#[derive(Debug)]
40enum BlockRangeState<B: BlockT> {
41 Downloading { len: NumberFor<B>, downloading: u32 },
42 Complete(Vec<BlockData<B>>),
43 Queued { len: NumberFor<B> },
44}
45
46impl<B: BlockT> BlockRangeState<B> {
47 pub fn len(&self) -> NumberFor<B> {
48 match *self {
49 Self::Downloading { len, .. } => len,
50 Self::Complete(ref blocks) => (blocks.len() as u32).into(),
51 Self::Queued { len } => len,
52 }
53 }
54}
55
56#[derive(Default)]
58pub struct BlockCollection<B: BlockT> {
59 blocks: BTreeMap<NumberFor<B>, BlockRangeState<B>>,
61 peer_requests: HashMap<PeerId, NumberFor<B>>,
62 queued_blocks: HashMap<B::Hash, (NumberFor<B>, NumberFor<B>)>,
65}
66
67impl<B: BlockT> BlockCollection<B> {
68 pub fn new() -> Self {
70 Self {
71 blocks: BTreeMap::new(),
72 peer_requests: HashMap::new(),
73 queued_blocks: HashMap::new(),
74 }
75 }
76
77 pub fn clear(&mut self) {
79 self.blocks.clear();
80 self.peer_requests.clear();
81 }
82
83 pub fn insert(&mut self, start: NumberFor<B>, blocks: Vec<message::BlockData<B>>, who: PeerId) {
85 if blocks.is_empty() {
86 return
87 }
88
89 match self.blocks.get(&start) {
90 Some(&BlockRangeState::Downloading { .. }) => {
91 trace!(target: LOG_TARGET, "Inserting block data still marked as being downloaded: {}", start);
92 },
93 Some(BlockRangeState::Complete(existing)) if existing.len() >= blocks.len() => {
94 trace!(target: LOG_TARGET, "Ignored block data already downloaded: {}", start);
95 return
96 },
97 _ => (),
98 }
99
100 self.blocks.insert(
101 start,
102 BlockRangeState::Complete(
103 blocks.into_iter().map(|b| BlockData { origin: Some(who), block: b }).collect(),
104 ),
105 );
106 }
107
108 pub fn needed_blocks(
111 &mut self,
112 who: PeerId,
113 count: u32,
114 peer_best: NumberFor<B>,
115 common: NumberFor<B>,
116 max_parallel: u32,
117 max_ahead: u32,
118 ) -> Option<Range<NumberFor<B>>> {
119 if peer_best <= common {
120 return None
122 }
123 let first_different = common + <NumberFor<B>>::one();
125 let count = (count as u32).into();
126 let (mut range, downloading) = {
127 let mut downloading_iter = self.blocks.iter().peekable();
129 let mut prev: Option<(&NumberFor<B>, &BlockRangeState<B>)> = None;
130 loop {
131 let next = downloading_iter.next();
132 break match (prev, next) {
133 (Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
138 if downloading < max_parallel && *start >= first_different =>
139 (*start..*start + *len, downloading),
140 (Some((start, r)), Some((next_start, _)))
143 if *start + r.len() < *next_start &&
144 *start + r.len() >= first_different =>
145 (*start + r.len()..cmp::min(*next_start, *start + r.len() + count), 0),
146 (Some((start, r)), None) if *start + r.len() >= first_different =>
149 (*start + r.len()..*start + r.len() + count, 0),
150 (None, None) => (first_different..first_different + count, 0),
153 (None, Some((start, _))) if *start > first_different =>
155 (first_different..cmp::min(first_different + count, *start), 0),
156 _ => {
158 prev = next;
159 continue
160 },
161 }
162 }
163 };
164 if range.start > peer_best {
166 trace!(target: LOG_TARGET, "Out of range for peer {} ({} vs {})", who, range.start, peer_best);
167 return None
168 }
169 range.end = cmp::min(peer_best + One::one(), range.end);
170
171 if self
172 .blocks
173 .iter()
174 .next()
175 .map_or(false, |(n, _)| range.start > *n + max_ahead.into())
176 {
177 trace!(target: LOG_TARGET, "Too far ahead for peer {} ({})", who, range.start);
178 return None
179 }
180
181 self.peer_requests.insert(who, range.start);
182 self.blocks.insert(
183 range.start,
184 BlockRangeState::Downloading {
185 len: range.end - range.start,
186 downloading: downloading + 1,
187 },
188 );
189 if range.end <= range.start {
190 panic!(
191 "Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}",
192 range, count, peer_best, common, self.blocks
193 );
194 }
195 Some(range)
196 }
197
198 pub fn ready_blocks(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
204 let mut ready = Vec::new();
205
206 let mut prev = from;
207 for (&start, range_data) in &mut self.blocks {
208 if start > prev {
209 break
210 }
211 let len = match range_data {
212 BlockRangeState::Complete(blocks) => {
213 let len = (blocks.len() as u32).into();
214 prev = start + len;
215 if let Some(BlockData { block, .. }) = blocks.first() {
216 self.queued_blocks
217 .insert(block.hash, (start, start + (blocks.len() as u32).into()));
218 }
219 ready.append(blocks);
221 len
222 },
223 BlockRangeState::Queued { .. } => continue,
224 _ => break,
225 };
226 *range_data = BlockRangeState::Queued { len };
227 }
228 trace!(target: LOG_TARGET, "{} blocks ready for import", ready.len());
229 ready
230 }
231
232 pub fn clear_queued(&mut self, hash: &B::Hash) {
233 if let Some((from, to)) = self.queued_blocks.remove(hash) {
234 let mut block_num = from;
235 while block_num < to {
236 self.blocks.remove(&block_num);
237 block_num += One::one();
238 }
239 trace!(target: LOG_TARGET, "Cleared blocks from {:?} to {:?}", from, to);
240 }
241 }
242
243 pub fn clear_peer_download(&mut self, who: &PeerId) {
244 if let Some(start) = self.peer_requests.remove(who) {
245 let remove = match self.blocks.get_mut(&start) {
246 Some(&mut BlockRangeState::Downloading { ref mut downloading, .. })
247 if *downloading > 1 =>
248 {
249 *downloading -= 1;
250 false
251 },
252 Some(&mut BlockRangeState::Downloading { .. }) => true,
253 _ => false,
254 };
255 if remove {
256 self.blocks.remove(&start);
257 }
258 }
259 }
260}
261
262#[cfg(test)]
263mod test {
264 use super::{BlockCollection, BlockData, BlockRangeState};
265 use sc_network_common::sync::message;
266 use sc_network_types::PeerId;
267 use sp_core::H256;
268 use sp_runtime::testing::{Block as RawBlock, MockCallU64, TestXt};
269
270 type Block = RawBlock<TestXt<MockCallU64, ()>>;
271
272 fn is_empty(bc: &BlockCollection<Block>) -> bool {
273 bc.blocks.is_empty() && bc.peer_requests.is_empty()
274 }
275
276 fn generate_blocks(n: usize) -> Vec<message::BlockData<Block>> {
277 (0..n)
278 .map(|_| message::generic::BlockData {
279 hash: H256::random(),
280 header: None,
281 body: None,
282 indexed_body: None,
283 message_queue: None,
284 receipt: None,
285 justification: None,
286 justifications: None,
287 })
288 .collect()
289 }
290
291 #[test]
292 fn create_clear() {
293 let mut bc = BlockCollection::new();
294 assert!(is_empty(&bc));
295 bc.insert(1, generate_blocks(100), PeerId::random());
296 assert!(!is_empty(&bc));
297 bc.clear();
298 assert!(is_empty(&bc));
299 }
300
301 #[test]
302 fn insert_blocks() {
303 let mut bc = BlockCollection::new();
304 assert!(is_empty(&bc));
305 let peer0 = PeerId::random();
306 let peer1 = PeerId::random();
307 let peer2 = PeerId::random();
308
309 let blocks = generate_blocks(150);
310 assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(1..41));
311 assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(41..81));
312 assert_eq!(bc.needed_blocks(peer2, 40, 150, 0, 1, 200), Some(81..121));
313
314 bc.clear_peer_download(&peer1);
315 bc.insert(41, blocks[41..81].to_vec(), peer1);
316 assert_eq!(bc.ready_blocks(1), vec![]);
317 assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(121..151));
318 bc.clear_peer_download(&peer0);
319 bc.insert(1, blocks[1..11].to_vec(), peer0);
320
321 assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(11..41));
322 assert_eq!(
323 bc.ready_blocks(1),
324 blocks[1..11]
325 .iter()
326 .map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
327 .collect::<Vec<_>>()
328 );
329
330 bc.clear_peer_download(&peer0);
331 bc.insert(11, blocks[11..41].to_vec(), peer0);
332
333 let ready = bc.ready_blocks(12);
334 assert_eq!(
335 ready[..30],
336 blocks[11..41]
337 .iter()
338 .map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
339 .collect::<Vec<_>>()[..]
340 );
341 assert_eq!(
342 ready[30..],
343 blocks[41..81]
344 .iter()
345 .map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
346 .collect::<Vec<_>>()[..]
347 );
348
349 bc.clear_peer_download(&peer2);
350 assert_eq!(bc.needed_blocks(peer2, 40, 150, 80, 1, 200), Some(81..121));
351 bc.clear_peer_download(&peer2);
352 bc.insert(81, blocks[81..121].to_vec(), peer2);
353 bc.clear_peer_download(&peer1);
354 bc.insert(121, blocks[121..150].to_vec(), peer1);
355
356 assert_eq!(bc.ready_blocks(80), vec![]);
357 let ready = bc.ready_blocks(81);
358 assert_eq!(
359 ready[..40],
360 blocks[81..121]
361 .iter()
362 .map(|b| BlockData { block: b.clone(), origin: Some(peer2) })
363 .collect::<Vec<_>>()[..]
364 );
365 assert_eq!(
366 ready[40..],
367 blocks[121..150]
368 .iter()
369 .map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
370 .collect::<Vec<_>>()[..]
371 );
372 }
373
374 #[test]
375 fn large_gap() {
376 let mut bc: BlockCollection<Block> = BlockCollection::new();
377 bc.blocks.insert(100, BlockRangeState::Downloading { len: 128, downloading: 1 });
378 let blocks = generate_blocks(10)
379 .into_iter()
380 .map(|b| BlockData { block: b, origin: None })
381 .collect();
382 bc.blocks.insert(114305, BlockRangeState::Complete(blocks));
383
384 let peer0 = PeerId::random();
385 assert_eq!(bc.needed_blocks(peer0, 128, 10000, 0, 1, 200), Some(1..100));
386 assert_eq!(bc.needed_blocks(peer0, 128, 10000, 0, 1, 200), None); assert_eq!(
388 bc.needed_blocks(peer0, 128, 10000, 0, 1, 200000),
389 Some(100 + 128..100 + 128 + 128)
390 );
391 }
392
393 #[test]
394 fn no_duplicate_requests_on_fork() {
395 let mut bc = BlockCollection::new();
396 assert!(is_empty(&bc));
397 let peer = PeerId::random();
398
399 let blocks = generate_blocks(10);
400
401 assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
403
404 bc.clear_peer_download(&peer);
406 bc.insert(40, blocks[..5].to_vec(), peer);
407
408 let ready = bc.ready_blocks(48);
410 assert_eq!(
411 ready,
412 blocks[..5]
413 .iter()
414 .map(|b| BlockData { block: b.clone(), origin: Some(peer) })
415 .collect::<Vec<_>>()
416 );
417
418 assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
419 }
420
421 #[test]
422 fn clear_queued_subsequent_ranges() {
423 let mut bc = BlockCollection::new();
424 assert!(is_empty(&bc));
425 let peer = PeerId::random();
426
427 let blocks = generate_blocks(10);
428
429 assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
431 assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
432
433 bc.clear_peer_download(&peer);
435 bc.insert(40, blocks.to_vec(), peer);
436
437 let ready = bc.ready_blocks(1000);
439 assert_eq!(
440 ready,
441 blocks
442 .iter()
443 .map(|b| BlockData { block: b.clone(), origin: Some(peer) })
444 .collect::<Vec<_>>()
445 );
446
447 bc.clear_queued(&blocks[0].hash);
448 assert!(bc.blocks.is_empty());
449 assert!(bc.queued_blocks.is_empty());
450 }
451
452 #[test]
453 fn downloaded_range_is_requested_from_max_parallel_peers() {
454 let mut bc = BlockCollection::new();
455 assert!(is_empty(&bc));
456
457 let count = 5;
458 let max_parallel = 2;
460 let max_ahead = 200;
461
462 let peer1 = PeerId::random();
463 let peer2 = PeerId::random();
464 let peer3 = PeerId::random();
465
466 let best = 100;
468 let common = 10;
469
470 assert_eq!(
471 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
472 Some(11..16)
473 );
474 assert_eq!(
475 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
476 Some(11..16)
477 );
478 assert_eq!(
479 bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
480 Some(16..21)
481 );
482 }
483 #[test]
484 fn downloaded_range_not_requested_from_peers_with_higher_common_number() {
485 let mut bc = BlockCollection::new();
492 assert!(is_empty(&bc));
493
494 let count = 5;
495 let max_parallel = 2;
496 let max_ahead = 200;
497
498 let peer1 = PeerId::random();
499 let peer1_best = 20;
500 let peer1_common = 10;
501
502 let peer2 = PeerId::random();
504 let peer2_best = 20;
505 let peer2_common = 11; assert_eq!(
508 bc.needed_blocks(peer1, count, peer1_best, peer1_common, max_parallel, max_ahead),
509 Some(11..16),
510 );
511 assert_eq!(
512 bc.needed_blocks(peer2, count, peer2_best, peer2_common, max_parallel, max_ahead),
513 Some(16..21),
514 );
515 }
516
517 #[test]
518 fn gap_above_common_number_requested() {
519 let mut bc = BlockCollection::new();
520 assert!(is_empty(&bc));
521
522 let count = 5;
523 let best = 30;
524 let max_parallel = 1;
527 let max_ahead = 200;
528
529 let peer1 = PeerId::random();
530 let peer2 = PeerId::random();
531 let peer3 = PeerId::random();
532
533 let common = 10;
534 assert_eq!(
535 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
536 Some(11..16),
537 );
538 assert_eq!(
539 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
540 Some(16..21),
541 );
542 assert_eq!(
543 bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
544 Some(21..26),
545 );
546
547 bc.clear_peer_download(&peer2);
550
551 assert_eq!(
553 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
554 Some(16..21),
555 );
556 }
557
558 #[test]
559 fn gap_below_common_number_not_requested() {
560 let mut bc = BlockCollection::new();
561 assert!(is_empty(&bc));
562
563 let count = 5;
564 let best = 30;
565 let max_parallel = 1;
568 let max_ahead = 200;
569
570 let peer1 = PeerId::random();
571 let peer2 = PeerId::random();
572 let peer3 = PeerId::random();
573
574 let common = 10;
575 assert_eq!(
576 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
577 Some(11..16),
578 );
579 assert_eq!(
580 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
581 Some(16..21),
582 );
583 assert_eq!(
584 bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
585 Some(21..26),
586 );
587
588 bc.clear_peer_download(&peer2);
591
592 let common = 23;
594 assert_eq!(
595 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
596 Some(26..31), );
598 }
599
600 #[test]
601 fn range_at_the_end_above_common_number_requested() {
602 let mut bc = BlockCollection::new();
603 assert!(is_empty(&bc));
604
605 let count = 5;
606 let best = 30;
607 let max_parallel = 1;
608 let max_ahead = 200;
609
610 let peer1 = PeerId::random();
611 let peer2 = PeerId::random();
612
613 let common = 10;
614 assert_eq!(
615 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
616 Some(11..16),
617 );
618 assert_eq!(
619 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
620 Some(16..21),
621 );
622 }
623
624 #[test]
625 fn range_at_the_end_below_common_number_not_requested() {
626 let mut bc = BlockCollection::new();
627 assert!(is_empty(&bc));
628
629 let count = 5;
630 let best = 30;
631 let max_parallel = 1;
632 let max_ahead = 200;
633
634 let peer1 = PeerId::random();
635 let peer2 = PeerId::random();
636
637 let common = 10;
638 assert_eq!(
639 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
640 Some(11..16),
641 );
642
643 let common = 20;
644 assert_eq!(
645 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
646 Some(21..26), );
648 }
649}