sc_network_sync/
blocks.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::LOG_TARGET;
20use log::trace;
21use sc_network_common::sync::message;
22use sc_network_types::PeerId;
23use sp_runtime::traits::{Block as BlockT, NumberFor, One};
24use std::{
25	cmp,
26	collections::{BTreeMap, HashMap},
27	ops::Range,
28};
29
30/// Block data with origin.
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct BlockData<B: BlockT> {
33	/// The Block Message from the wire
34	pub block: message::BlockData<B>,
35	/// The peer, we received this from
36	pub origin: Option<PeerId>,
37}
38
39#[derive(Debug)]
40enum BlockRangeState<B: BlockT> {
41	Downloading { len: NumberFor<B>, downloading: u32 },
42	Complete(Vec<BlockData<B>>),
43	Queued { len: NumberFor<B> },
44}
45
46impl<B: BlockT> BlockRangeState<B> {
47	pub fn len(&self) -> NumberFor<B> {
48		match *self {
49			Self::Downloading { len, .. } => len,
50			Self::Complete(ref blocks) => (blocks.len() as u32).into(),
51			Self::Queued { len } => len,
52		}
53	}
54}
55
56/// A collection of blocks being downloaded.
57#[derive(Default)]
58pub struct BlockCollection<B: BlockT> {
59	/// Downloaded blocks.
60	blocks: BTreeMap<NumberFor<B>, BlockRangeState<B>>,
61	peer_requests: HashMap<PeerId, NumberFor<B>>,
62	/// Block ranges downloaded and queued for import.
63	/// Maps start_hash => (start_num, end_num).
64	queued_blocks: HashMap<B::Hash, (NumberFor<B>, NumberFor<B>)>,
65}
66
67impl<B: BlockT> BlockCollection<B> {
68	/// Create a new instance.
69	pub fn new() -> Self {
70		Self {
71			blocks: BTreeMap::new(),
72			peer_requests: HashMap::new(),
73			queued_blocks: HashMap::new(),
74		}
75	}
76
77	/// Clear everything.
78	pub fn clear(&mut self) {
79		self.blocks.clear();
80		self.peer_requests.clear();
81	}
82
83	/// Insert a set of blocks into collection.
84	pub fn insert(&mut self, start: NumberFor<B>, blocks: Vec<message::BlockData<B>>, who: PeerId) {
85		if blocks.is_empty() {
86			return
87		}
88
89		match self.blocks.get(&start) {
90			Some(&BlockRangeState::Downloading { .. }) => {
91				trace!(target: LOG_TARGET, "Inserting block data still marked as being downloaded: {}", start);
92			},
93			Some(BlockRangeState::Complete(existing)) if existing.len() >= blocks.len() => {
94				trace!(target: LOG_TARGET, "Ignored block data already downloaded: {}", start);
95				return
96			},
97			_ => (),
98		}
99
100		self.blocks.insert(
101			start,
102			BlockRangeState::Complete(
103				blocks.into_iter().map(|b| BlockData { origin: Some(who), block: b }).collect(),
104			),
105		);
106	}
107
108	/// Returns a set of block hashes that require a header download. The returned set is marked as
109	/// being downloaded.
110	pub fn needed_blocks(
111		&mut self,
112		who: PeerId,
113		count: u32,
114		peer_best: NumberFor<B>,
115		common: NumberFor<B>,
116		max_parallel: u32,
117		max_ahead: u32,
118	) -> Option<Range<NumberFor<B>>> {
119		if peer_best <= common {
120			// Bail out early
121			return None
122		}
123		// First block number that we need to download
124		let first_different = common + <NumberFor<B>>::one();
125		let count = (count as u32).into();
126		let (mut range, downloading) = {
127			// Iterate through the ranges in `self.blocks` looking for a range to download
128			let mut downloading_iter = self.blocks.iter().peekable();
129			let mut prev: Option<(&NumberFor<B>, &BlockRangeState<B>)> = None;
130			loop {
131				let next = downloading_iter.next();
132				break match (prev, next) {
133					// If we are already downloading this range, request it from `max_parallel`
134					// peers (`max_parallel = 5` by default).
135					// Do not request already downloading range from peers with common number above
136					// the range start.
137					(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
138						if downloading < max_parallel && *start >= first_different =>
139						(*start..*start + *len, downloading),
140					// If there is a gap between ranges requested, download this gap unless the peer
141					// has common number above the gap start
142					(Some((start, r)), Some((next_start, _)))
143						if *start + r.len() < *next_start &&
144							*start + r.len() >= first_different =>
145						(*start + r.len()..cmp::min(*next_start, *start + r.len() + count), 0),
146					// Download `count` blocks after the last range requested unless the peer
147					// has common number above this new range
148					(Some((start, r)), None) if *start + r.len() >= first_different =>
149						(*start + r.len()..*start + r.len() + count, 0),
150					// If there are no ranges currently requested, download `count` blocks after
151					// `common` number
152					(None, None) => (first_different..first_different + count, 0),
153					// If the first range starts above `common + 1`, download the gap at the start
154					(None, Some((start, _))) if *start > first_different =>
155						(first_different..cmp::min(first_different + count, *start), 0),
156					// Move on to the next range pair
157					_ => {
158						prev = next;
159						continue
160					},
161				}
162			}
163		};
164		// crop to peers best
165		if range.start > peer_best {
166			trace!(target: LOG_TARGET, "Out of range for peer {} ({} vs {})", who, range.start, peer_best);
167			return None
168		}
169		range.end = cmp::min(peer_best + One::one(), range.end);
170
171		if self
172			.blocks
173			.iter()
174			.next()
175			.map_or(false, |(n, _)| range.start > *n + max_ahead.into())
176		{
177			trace!(target: LOG_TARGET, "Too far ahead for peer {} ({})", who, range.start);
178			return None
179		}
180
181		self.peer_requests.insert(who, range.start);
182		self.blocks.insert(
183			range.start,
184			BlockRangeState::Downloading {
185				len: range.end - range.start,
186				downloading: downloading + 1,
187			},
188		);
189		if range.end <= range.start {
190			panic!(
191				"Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}",
192				range, count, peer_best, common, self.blocks
193			);
194		}
195		Some(range)
196	}
197
198	/// Get a valid chain of blocks ordered in descending order and ready for importing into
199	/// the blockchain.
200	/// `from` is the maximum block number for the start of the range that we are interested in.
201	/// The function will return empty Vec if the first block ready is higher than `from`.
202	/// For each returned block hash `clear_queued` must be called at some later stage.
203	pub fn ready_blocks(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
204		let mut ready = Vec::new();
205
206		let mut prev = from;
207		for (&start, range_data) in &mut self.blocks {
208			if start > prev {
209				break
210			}
211			let len = match range_data {
212				BlockRangeState::Complete(blocks) => {
213					let len = (blocks.len() as u32).into();
214					prev = start + len;
215					if let Some(BlockData { block, .. }) = blocks.first() {
216						self.queued_blocks
217							.insert(block.hash, (start, start + (blocks.len() as u32).into()));
218					}
219					// Remove all elements from `blocks` and add them to `ready`
220					ready.append(blocks);
221					len
222				},
223				BlockRangeState::Queued { .. } => continue,
224				_ => break,
225			};
226			*range_data = BlockRangeState::Queued { len };
227		}
228		trace!(target: LOG_TARGET, "{} blocks ready for import", ready.len());
229		ready
230	}
231
232	pub fn clear_queued(&mut self, hash: &B::Hash) {
233		if let Some((from, to)) = self.queued_blocks.remove(hash) {
234			let mut block_num = from;
235			while block_num < to {
236				self.blocks.remove(&block_num);
237				block_num += One::one();
238			}
239			trace!(target: LOG_TARGET, "Cleared blocks from {:?} to {:?}", from, to);
240		}
241	}
242
243	pub fn clear_peer_download(&mut self, who: &PeerId) {
244		if let Some(start) = self.peer_requests.remove(who) {
245			let remove = match self.blocks.get_mut(&start) {
246				Some(&mut BlockRangeState::Downloading { ref mut downloading, .. })
247					if *downloading > 1 =>
248				{
249					*downloading -= 1;
250					false
251				},
252				Some(&mut BlockRangeState::Downloading { .. }) => true,
253				_ => false,
254			};
255			if remove {
256				self.blocks.remove(&start);
257			}
258		}
259	}
260}
261
262#[cfg(test)]
263mod test {
264	use super::{BlockCollection, BlockData, BlockRangeState};
265	use sc_network_common::sync::message;
266	use sc_network_types::PeerId;
267	use sp_core::H256;
268	use sp_runtime::testing::{Block as RawBlock, MockCallU64, TestXt};
269
270	type Block = RawBlock<TestXt<MockCallU64, ()>>;
271
272	fn is_empty(bc: &BlockCollection<Block>) -> bool {
273		bc.blocks.is_empty() && bc.peer_requests.is_empty()
274	}
275
276	fn generate_blocks(n: usize) -> Vec<message::BlockData<Block>> {
277		(0..n)
278			.map(|_| message::generic::BlockData {
279				hash: H256::random(),
280				header: None,
281				body: None,
282				indexed_body: None,
283				message_queue: None,
284				receipt: None,
285				justification: None,
286				justifications: None,
287			})
288			.collect()
289	}
290
291	#[test]
292	fn create_clear() {
293		let mut bc = BlockCollection::new();
294		assert!(is_empty(&bc));
295		bc.insert(1, generate_blocks(100), PeerId::random());
296		assert!(!is_empty(&bc));
297		bc.clear();
298		assert!(is_empty(&bc));
299	}
300
301	#[test]
302	fn insert_blocks() {
303		let mut bc = BlockCollection::new();
304		assert!(is_empty(&bc));
305		let peer0 = PeerId::random();
306		let peer1 = PeerId::random();
307		let peer2 = PeerId::random();
308
309		let blocks = generate_blocks(150);
310		assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(1..41));
311		assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(41..81));
312		assert_eq!(bc.needed_blocks(peer2, 40, 150, 0, 1, 200), Some(81..121));
313
314		bc.clear_peer_download(&peer1);
315		bc.insert(41, blocks[41..81].to_vec(), peer1);
316		assert_eq!(bc.ready_blocks(1), vec![]);
317		assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(121..151));
318		bc.clear_peer_download(&peer0);
319		bc.insert(1, blocks[1..11].to_vec(), peer0);
320
321		assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(11..41));
322		assert_eq!(
323			bc.ready_blocks(1),
324			blocks[1..11]
325				.iter()
326				.map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
327				.collect::<Vec<_>>()
328		);
329
330		bc.clear_peer_download(&peer0);
331		bc.insert(11, blocks[11..41].to_vec(), peer0);
332
333		let ready = bc.ready_blocks(12);
334		assert_eq!(
335			ready[..30],
336			blocks[11..41]
337				.iter()
338				.map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
339				.collect::<Vec<_>>()[..]
340		);
341		assert_eq!(
342			ready[30..],
343			blocks[41..81]
344				.iter()
345				.map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
346				.collect::<Vec<_>>()[..]
347		);
348
349		bc.clear_peer_download(&peer2);
350		assert_eq!(bc.needed_blocks(peer2, 40, 150, 80, 1, 200), Some(81..121));
351		bc.clear_peer_download(&peer2);
352		bc.insert(81, blocks[81..121].to_vec(), peer2);
353		bc.clear_peer_download(&peer1);
354		bc.insert(121, blocks[121..150].to_vec(), peer1);
355
356		assert_eq!(bc.ready_blocks(80), vec![]);
357		let ready = bc.ready_blocks(81);
358		assert_eq!(
359			ready[..40],
360			blocks[81..121]
361				.iter()
362				.map(|b| BlockData { block: b.clone(), origin: Some(peer2) })
363				.collect::<Vec<_>>()[..]
364		);
365		assert_eq!(
366			ready[40..],
367			blocks[121..150]
368				.iter()
369				.map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
370				.collect::<Vec<_>>()[..]
371		);
372	}
373
374	#[test]
375	fn large_gap() {
376		let mut bc: BlockCollection<Block> = BlockCollection::new();
377		bc.blocks.insert(100, BlockRangeState::Downloading { len: 128, downloading: 1 });
378		let blocks = generate_blocks(10)
379			.into_iter()
380			.map(|b| BlockData { block: b, origin: None })
381			.collect();
382		bc.blocks.insert(114305, BlockRangeState::Complete(blocks));
383
384		let peer0 = PeerId::random();
385		assert_eq!(bc.needed_blocks(peer0, 128, 10000, 0, 1, 200), Some(1..100));
386		assert_eq!(bc.needed_blocks(peer0, 128, 10000, 0, 1, 200), None); // too far ahead
387		assert_eq!(
388			bc.needed_blocks(peer0, 128, 10000, 0, 1, 200000),
389			Some(100 + 128..100 + 128 + 128)
390		);
391	}
392
393	#[test]
394	fn no_duplicate_requests_on_fork() {
395		let mut bc = BlockCollection::new();
396		assert!(is_empty(&bc));
397		let peer = PeerId::random();
398
399		let blocks = generate_blocks(10);
400
401		// count = 5, peer_best = 50, common = 39, max_parallel = 0, max_ahead = 200
402		assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
403
404		// got a response on the request for `40..45`
405		bc.clear_peer_download(&peer);
406		bc.insert(40, blocks[..5].to_vec(), peer);
407
408		// our "node" started on a fork, with its current best = 47, which is > common
409		let ready = bc.ready_blocks(48);
410		assert_eq!(
411			ready,
412			blocks[..5]
413				.iter()
414				.map(|b| BlockData { block: b.clone(), origin: Some(peer) })
415				.collect::<Vec<_>>()
416		);
417
418		assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
419	}
420
421	#[test]
422	fn clear_queued_subsequent_ranges() {
423		let mut bc = BlockCollection::new();
424		assert!(is_empty(&bc));
425		let peer = PeerId::random();
426
427		let blocks = generate_blocks(10);
428
429		// Request 2 ranges
430		assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
431		assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
432
433		// got a response on the request for `40..50`
434		bc.clear_peer_download(&peer);
435		bc.insert(40, blocks.to_vec(), peer);
436
437		// request any blocks starting from 1000 or lower.
438		let ready = bc.ready_blocks(1000);
439		assert_eq!(
440			ready,
441			blocks
442				.iter()
443				.map(|b| BlockData { block: b.clone(), origin: Some(peer) })
444				.collect::<Vec<_>>()
445		);
446
447		bc.clear_queued(&blocks[0].hash);
448		assert!(bc.blocks.is_empty());
449		assert!(bc.queued_blocks.is_empty());
450	}
451
452	#[test]
453	fn downloaded_range_is_requested_from_max_parallel_peers() {
454		let mut bc = BlockCollection::new();
455		assert!(is_empty(&bc));
456
457		let count = 5;
458		// identical ranges requested from 2 peers
459		let max_parallel = 2;
460		let max_ahead = 200;
461
462		let peer1 = PeerId::random();
463		let peer2 = PeerId::random();
464		let peer3 = PeerId::random();
465
466		// common for all peers
467		let best = 100;
468		let common = 10;
469
470		assert_eq!(
471			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
472			Some(11..16)
473		);
474		assert_eq!(
475			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
476			Some(11..16)
477		);
478		assert_eq!(
479			bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
480			Some(16..21)
481		);
482	}
483	#[test]
484	fn downloaded_range_not_requested_from_peers_with_higher_common_number() {
485		// A peer connects with a common number falling behind our best number
486		// (either a fork or lagging behind).
487		// We request a range from this peer starting at its common number + 1.
488		// Even though we have less than `max_parallel` downloads, we do not request
489		// this range from peers with a common number above the start of this range.
490
491		let mut bc = BlockCollection::new();
492		assert!(is_empty(&bc));
493
494		let count = 5;
495		let max_parallel = 2;
496		let max_ahead = 200;
497
498		let peer1 = PeerId::random();
499		let peer1_best = 20;
500		let peer1_common = 10;
501
502		// `peer2` has first different above the start of the range downloaded from `peer1`
503		let peer2 = PeerId::random();
504		let peer2_best = 20;
505		let peer2_common = 11; // first_different = 12
506
507		assert_eq!(
508			bc.needed_blocks(peer1, count, peer1_best, peer1_common, max_parallel, max_ahead),
509			Some(11..16),
510		);
511		assert_eq!(
512			bc.needed_blocks(peer2, count, peer2_best, peer2_common, max_parallel, max_ahead),
513			Some(16..21),
514		);
515	}
516
517	#[test]
518	fn gap_above_common_number_requested() {
519		let mut bc = BlockCollection::new();
520		assert!(is_empty(&bc));
521
522		let count = 5;
523		let best = 30;
524		// We need at least 3 ranges requested to have a gap, so to minimize the number of peers
525		// set `max_parallel = 1`
526		let max_parallel = 1;
527		let max_ahead = 200;
528
529		let peer1 = PeerId::random();
530		let peer2 = PeerId::random();
531		let peer3 = PeerId::random();
532
533		let common = 10;
534		assert_eq!(
535			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
536			Some(11..16),
537		);
538		assert_eq!(
539			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
540			Some(16..21),
541		);
542		assert_eq!(
543			bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
544			Some(21..26),
545		);
546
547		// For some reason there is now a gap at 16..21. We just disconnect `peer2`, but it might
548		// also happen that 16..21 received first and got imported if our best is actually >= 15.
549		bc.clear_peer_download(&peer2);
550
551		// Some peer connects with common number below the gap. The gap is requested from it.
552		assert_eq!(
553			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
554			Some(16..21),
555		);
556	}
557
558	#[test]
559	fn gap_below_common_number_not_requested() {
560		let mut bc = BlockCollection::new();
561		assert!(is_empty(&bc));
562
563		let count = 5;
564		let best = 30;
565		// We need at least 3 ranges requested to have a gap, so to minimize the number of peers
566		// set `max_parallel = 1`
567		let max_parallel = 1;
568		let max_ahead = 200;
569
570		let peer1 = PeerId::random();
571		let peer2 = PeerId::random();
572		let peer3 = PeerId::random();
573
574		let common = 10;
575		assert_eq!(
576			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
577			Some(11..16),
578		);
579		assert_eq!(
580			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
581			Some(16..21),
582		);
583		assert_eq!(
584			bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
585			Some(21..26),
586		);
587
588		// For some reason there is now a gap at 16..21. We just disconnect `peer2`, but it might
589		// also happen that 16..21 received first and got imported if our best is actually >= 15.
590		bc.clear_peer_download(&peer2);
591
592		// Some peer connects with common number above the gap. The gap is not requested from it.
593		let common = 23;
594		assert_eq!(
595			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
596			Some(26..31), // not 16..21
597		);
598	}
599
600	#[test]
601	fn range_at_the_end_above_common_number_requested() {
602		let mut bc = BlockCollection::new();
603		assert!(is_empty(&bc));
604
605		let count = 5;
606		let best = 30;
607		let max_parallel = 1;
608		let max_ahead = 200;
609
610		let peer1 = PeerId::random();
611		let peer2 = PeerId::random();
612
613		let common = 10;
614		assert_eq!(
615			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
616			Some(11..16),
617		);
618		assert_eq!(
619			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
620			Some(16..21),
621		);
622	}
623
624	#[test]
625	fn range_at_the_end_below_common_number_not_requested() {
626		let mut bc = BlockCollection::new();
627		assert!(is_empty(&bc));
628
629		let count = 5;
630		let best = 30;
631		let max_parallel = 1;
632		let max_ahead = 200;
633
634		let peer1 = PeerId::random();
635		let peer2 = PeerId::random();
636
637		let common = 10;
638		assert_eq!(
639			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
640			Some(11..16),
641		);
642
643		let common = 20;
644		assert_eq!(
645			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
646			Some(21..26), // not 16..21
647		);
648	}
649}