sc_network_sync/strategy/
state_sync.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! State sync support.
20
21use crate::{
22	schema::v1::{StateEntry, StateRequest, StateResponse},
23	LOG_TARGET,
24};
25use codec::{Decode, Encode};
26use log::debug;
27use sc_client_api::{CompactProof, ProofProvider};
28use sc_consensus::ImportedState;
29use smallvec::SmallVec;
30use sp_core::storage::well_known_keys;
31use sp_runtime::{
32	traits::{Block as BlockT, Header, NumberFor},
33	Justifications,
34};
35use std::{collections::HashMap, fmt, sync::Arc};
36
37/// Generic state sync provider. Used for mocking in tests.
38pub trait StateSyncProvider<B: BlockT>: Send + Sync {
39	/// Validate and import a state response.
40	fn import(&mut self, response: StateResponse) -> ImportResult<B>;
41	/// Produce next state request.
42	fn next_request(&self) -> StateRequest;
43	/// Check if the state is complete.
44	fn is_complete(&self) -> bool;
45	/// Returns target block number.
46	fn target_number(&self) -> NumberFor<B>;
47	/// Returns target block hash.
48	fn target_hash(&self) -> B::Hash;
49	/// Returns state sync estimated progress.
50	fn progress(&self) -> StateSyncProgress;
51}
52
53// Reported state sync phase.
54#[derive(Clone, Eq, PartialEq, Debug)]
55pub enum StateSyncPhase {
56	// State download in progress.
57	DownloadingState,
58	// Download is complete, state is being imported.
59	ImportingState,
60}
61
62impl fmt::Display for StateSyncPhase {
63	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
64		match self {
65			Self::DownloadingState => write!(f, "Downloading state"),
66			Self::ImportingState => write!(f, "Importing state"),
67		}
68	}
69}
70
71/// Reported state download progress.
72#[derive(Clone, Eq, PartialEq, Debug)]
73pub struct StateSyncProgress {
74	/// Estimated download percentage.
75	pub percentage: u32,
76	/// Total state size in bytes downloaded so far.
77	pub size: u64,
78	/// Current state sync phase.
79	pub phase: StateSyncPhase,
80}
81
82/// Import state chunk result.
83pub enum ImportResult<B: BlockT> {
84	/// State is complete and ready for import.
85	Import(B::Hash, B::Header, ImportedState<B>, Option<Vec<B::Extrinsic>>, Option<Justifications>),
86	/// Continue downloading.
87	Continue,
88	/// Bad state chunk.
89	BadResponse,
90}
91
92/// State sync state machine. Accumulates partial state data until it
93/// is ready to be imported.
94pub struct StateSync<B: BlockT, Client> {
95	target_block: B::Hash,
96	target_header: B::Header,
97	target_root: B::Hash,
98	target_body: Option<Vec<B::Extrinsic>>,
99	target_justifications: Option<Justifications>,
100	last_key: SmallVec<[Vec<u8>; 2]>,
101	state: HashMap<Vec<u8>, (Vec<(Vec<u8>, Vec<u8>)>, Vec<Vec<u8>>)>,
102	complete: bool,
103	client: Arc<Client>,
104	imported_bytes: u64,
105	skip_proof: bool,
106}
107
108impl<B, Client> StateSync<B, Client>
109where
110	B: BlockT,
111	Client: ProofProvider<B> + Send + Sync + 'static,
112{
113	///  Create a new instance.
114	pub fn new(
115		client: Arc<Client>,
116		target_header: B::Header,
117		target_body: Option<Vec<B::Extrinsic>>,
118		target_justifications: Option<Justifications>,
119		skip_proof: bool,
120	) -> Self {
121		Self {
122			client,
123			target_block: target_header.hash(),
124			target_root: *target_header.state_root(),
125			target_header,
126			target_body,
127			target_justifications,
128			last_key: SmallVec::default(),
129			state: HashMap::default(),
130			complete: false,
131			imported_bytes: 0,
132			skip_proof,
133		}
134	}
135}
136
137impl<B, Client> StateSyncProvider<B> for StateSync<B, Client>
138where
139	B: BlockT,
140	Client: ProofProvider<B> + Send + Sync + 'static,
141{
142	///  Validate and import a state response.
143	fn import(&mut self, response: StateResponse) -> ImportResult<B> {
144		if response.entries.is_empty() && response.proof.is_empty() {
145			debug!(target: LOG_TARGET, "Bad state response");
146			return ImportResult::BadResponse
147		}
148		if !self.skip_proof && response.proof.is_empty() {
149			debug!(target: LOG_TARGET, "Missing proof");
150			return ImportResult::BadResponse
151		}
152		let complete = if !self.skip_proof {
153			debug!(target: LOG_TARGET, "Importing state from {} trie nodes", response.proof.len());
154			let proof_size = response.proof.len() as u64;
155			let proof = match CompactProof::decode(&mut response.proof.as_ref()) {
156				Ok(proof) => proof,
157				Err(e) => {
158					debug!(target: LOG_TARGET, "Error decoding proof: {:?}", e);
159					return ImportResult::BadResponse
160				},
161			};
162			let (values, completed) = match self.client.verify_range_proof(
163				self.target_root,
164				proof,
165				self.last_key.as_slice(),
166			) {
167				Err(e) => {
168					debug!(
169						target: LOG_TARGET,
170						"StateResponse failed proof verification: {}",
171						e,
172					);
173					return ImportResult::BadResponse
174				},
175				Ok(values) => values,
176			};
177			debug!(target: LOG_TARGET, "Imported with {} keys", values.len());
178
179			let complete = completed == 0;
180			if !complete && !values.update_last_key(completed, &mut self.last_key) {
181				debug!(target: LOG_TARGET, "Error updating key cursor, depth: {}", completed);
182			};
183
184			for values in values.0 {
185				let key_values = if values.state_root.is_empty() {
186					// Read child trie roots.
187					values
188						.key_values
189						.into_iter()
190						.filter(|key_value| {
191							if well_known_keys::is_child_storage_key(key_value.0.as_slice()) {
192								self.state
193									.entry(key_value.1.clone())
194									.or_default()
195									.1
196									.push(key_value.0.clone());
197								false
198							} else {
199								true
200							}
201						})
202						.collect()
203				} else {
204					values.key_values
205				};
206				let entry = self.state.entry(values.state_root).or_default();
207				if entry.0.len() > 0 && entry.1.len() > 1 {
208					// Already imported child_trie with same root.
209					// Warning this will not work with parallel download.
210				} else if entry.0.is_empty() {
211					for (key, _value) in key_values.iter() {
212						self.imported_bytes += key.len() as u64;
213					}
214
215					entry.0 = key_values;
216				} else {
217					for (key, value) in key_values {
218						self.imported_bytes += key.len() as u64;
219						entry.0.push((key, value))
220					}
221				}
222			}
223			self.imported_bytes += proof_size;
224			complete
225		} else {
226			let mut complete = true;
227			// if the trie is a child trie and one of its parent trie is empty,
228			// the parent cursor stays valid.
229			// Empty parent trie content only happens when all the response content
230			// is part of a single child trie.
231			if self.last_key.len() == 2 && response.entries[0].entries.is_empty() {
232				// Do not remove the parent trie position.
233				self.last_key.pop();
234			} else {
235				self.last_key.clear();
236			}
237			for state in response.entries {
238				debug!(
239					target: LOG_TARGET,
240					"Importing state from {:?} to {:?}",
241					state.entries.last().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
242					state.entries.first().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
243				);
244
245				if !state.complete {
246					if let Some(e) = state.entries.last() {
247						self.last_key.push(e.key.clone());
248					}
249					complete = false;
250				}
251				let is_top = state.state_root.is_empty();
252				let entry = self.state.entry(state.state_root).or_default();
253				if entry.0.len() > 0 && entry.1.len() > 1 {
254					// Already imported child trie with same root.
255				} else {
256					let mut child_roots = Vec::new();
257					for StateEntry { key, value } in state.entries {
258						// Skip all child key root (will be recalculated on import).
259						if is_top && well_known_keys::is_child_storage_key(key.as_slice()) {
260							child_roots.push((value, key));
261						} else {
262							self.imported_bytes += key.len() as u64;
263							entry.0.push((key, value))
264						}
265					}
266					for (root, storage_key) in child_roots {
267						self.state.entry(root).or_default().1.push(storage_key);
268					}
269				}
270			}
271			complete
272		};
273		if complete {
274			self.complete = true;
275			ImportResult::Import(
276				self.target_block,
277				self.target_header.clone(),
278				ImportedState {
279					block: self.target_block,
280					state: std::mem::take(&mut self.state).into(),
281				},
282				self.target_body.clone(),
283				self.target_justifications.clone(),
284			)
285		} else {
286			ImportResult::Continue
287		}
288	}
289
290	/// Produce next state request.
291	fn next_request(&self) -> StateRequest {
292		StateRequest {
293			block: self.target_block.encode(),
294			start: self.last_key.clone().into_vec(),
295			no_proof: self.skip_proof,
296		}
297	}
298
299	/// Check if the state is complete.
300	fn is_complete(&self) -> bool {
301		self.complete
302	}
303
304	/// Returns target block number.
305	fn target_number(&self) -> NumberFor<B> {
306		*self.target_header.number()
307	}
308
309	/// Returns target block hash.
310	fn target_hash(&self) -> B::Hash {
311		self.target_block
312	}
313
314	/// Returns state sync estimated progress.
315	fn progress(&self) -> StateSyncProgress {
316		let cursor = *self.last_key.get(0).and_then(|last| last.get(0)).unwrap_or(&0u8);
317		let percent_done = cursor as u32 * 100 / 256;
318		StateSyncProgress {
319			percentage: percent_done,
320			size: self.imported_bytes,
321			phase: if self.complete {
322				StateSyncPhase::ImportingState
323			} else {
324				StateSyncPhase::DownloadingState
325			},
326		}
327	}
328}