sc_network_sync/strategy/
state_sync.rs1use crate::{
22 schema::v1::{StateEntry, StateRequest, StateResponse},
23 LOG_TARGET,
24};
25use codec::{Decode, Encode};
26use log::debug;
27use sc_client_api::{CompactProof, ProofProvider};
28use sc_consensus::ImportedState;
29use smallvec::SmallVec;
30use sp_core::storage::well_known_keys;
31use sp_runtime::{
32 traits::{Block as BlockT, Header, NumberFor},
33 Justifications,
34};
35use std::{collections::HashMap, fmt, sync::Arc};
36
37pub trait StateSyncProvider<B: BlockT>: Send + Sync {
39 fn import(&mut self, response: StateResponse) -> ImportResult<B>;
41 fn next_request(&self) -> StateRequest;
43 fn is_complete(&self) -> bool;
45 fn target_number(&self) -> NumberFor<B>;
47 fn target_hash(&self) -> B::Hash;
49 fn progress(&self) -> StateSyncProgress;
51}
52
53#[derive(Clone, Eq, PartialEq, Debug)]
55pub enum StateSyncPhase {
56 DownloadingState,
58 ImportingState,
60}
61
62impl fmt::Display for StateSyncPhase {
63 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
64 match self {
65 Self::DownloadingState => write!(f, "Downloading state"),
66 Self::ImportingState => write!(f, "Importing state"),
67 }
68 }
69}
70
71#[derive(Clone, Eq, PartialEq, Debug)]
73pub struct StateSyncProgress {
74 pub percentage: u32,
76 pub size: u64,
78 pub phase: StateSyncPhase,
80}
81
82pub enum ImportResult<B: BlockT> {
84 Import(B::Hash, B::Header, ImportedState<B>, Option<Vec<B::Extrinsic>>, Option<Justifications>),
86 Continue,
88 BadResponse,
90}
91
92pub struct StateSync<B: BlockT, Client> {
95 target_block: B::Hash,
96 target_header: B::Header,
97 target_root: B::Hash,
98 target_body: Option<Vec<B::Extrinsic>>,
99 target_justifications: Option<Justifications>,
100 last_key: SmallVec<[Vec<u8>; 2]>,
101 state: HashMap<Vec<u8>, (Vec<(Vec<u8>, Vec<u8>)>, Vec<Vec<u8>>)>,
102 complete: bool,
103 client: Arc<Client>,
104 imported_bytes: u64,
105 skip_proof: bool,
106}
107
108impl<B, Client> StateSync<B, Client>
109where
110 B: BlockT,
111 Client: ProofProvider<B> + Send + Sync + 'static,
112{
113 pub fn new(
115 client: Arc<Client>,
116 target_header: B::Header,
117 target_body: Option<Vec<B::Extrinsic>>,
118 target_justifications: Option<Justifications>,
119 skip_proof: bool,
120 ) -> Self {
121 Self {
122 client,
123 target_block: target_header.hash(),
124 target_root: *target_header.state_root(),
125 target_header,
126 target_body,
127 target_justifications,
128 last_key: SmallVec::default(),
129 state: HashMap::default(),
130 complete: false,
131 imported_bytes: 0,
132 skip_proof,
133 }
134 }
135}
136
137impl<B, Client> StateSyncProvider<B> for StateSync<B, Client>
138where
139 B: BlockT,
140 Client: ProofProvider<B> + Send + Sync + 'static,
141{
142 fn import(&mut self, response: StateResponse) -> ImportResult<B> {
144 if response.entries.is_empty() && response.proof.is_empty() {
145 debug!(target: LOG_TARGET, "Bad state response");
146 return ImportResult::BadResponse
147 }
148 if !self.skip_proof && response.proof.is_empty() {
149 debug!(target: LOG_TARGET, "Missing proof");
150 return ImportResult::BadResponse
151 }
152 let complete = if !self.skip_proof {
153 debug!(target: LOG_TARGET, "Importing state from {} trie nodes", response.proof.len());
154 let proof_size = response.proof.len() as u64;
155 let proof = match CompactProof::decode(&mut response.proof.as_ref()) {
156 Ok(proof) => proof,
157 Err(e) => {
158 debug!(target: LOG_TARGET, "Error decoding proof: {:?}", e);
159 return ImportResult::BadResponse
160 },
161 };
162 let (values, completed) = match self.client.verify_range_proof(
163 self.target_root,
164 proof,
165 self.last_key.as_slice(),
166 ) {
167 Err(e) => {
168 debug!(
169 target: LOG_TARGET,
170 "StateResponse failed proof verification: {}",
171 e,
172 );
173 return ImportResult::BadResponse
174 },
175 Ok(values) => values,
176 };
177 debug!(target: LOG_TARGET, "Imported with {} keys", values.len());
178
179 let complete = completed == 0;
180 if !complete && !values.update_last_key(completed, &mut self.last_key) {
181 debug!(target: LOG_TARGET, "Error updating key cursor, depth: {}", completed);
182 };
183
184 for values in values.0 {
185 let key_values = if values.state_root.is_empty() {
186 values
188 .key_values
189 .into_iter()
190 .filter(|key_value| {
191 if well_known_keys::is_child_storage_key(key_value.0.as_slice()) {
192 self.state
193 .entry(key_value.1.clone())
194 .or_default()
195 .1
196 .push(key_value.0.clone());
197 false
198 } else {
199 true
200 }
201 })
202 .collect()
203 } else {
204 values.key_values
205 };
206 let entry = self.state.entry(values.state_root).or_default();
207 if entry.0.len() > 0 && entry.1.len() > 1 {
208 } else if entry.0.is_empty() {
211 for (key, _value) in key_values.iter() {
212 self.imported_bytes += key.len() as u64;
213 }
214
215 entry.0 = key_values;
216 } else {
217 for (key, value) in key_values {
218 self.imported_bytes += key.len() as u64;
219 entry.0.push((key, value))
220 }
221 }
222 }
223 self.imported_bytes += proof_size;
224 complete
225 } else {
226 let mut complete = true;
227 if self.last_key.len() == 2 && response.entries[0].entries.is_empty() {
232 self.last_key.pop();
234 } else {
235 self.last_key.clear();
236 }
237 for state in response.entries {
238 debug!(
239 target: LOG_TARGET,
240 "Importing state from {:?} to {:?}",
241 state.entries.last().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
242 state.entries.first().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
243 );
244
245 if !state.complete {
246 if let Some(e) = state.entries.last() {
247 self.last_key.push(e.key.clone());
248 }
249 complete = false;
250 }
251 let is_top = state.state_root.is_empty();
252 let entry = self.state.entry(state.state_root).or_default();
253 if entry.0.len() > 0 && entry.1.len() > 1 {
254 } else {
256 let mut child_roots = Vec::new();
257 for StateEntry { key, value } in state.entries {
258 if is_top && well_known_keys::is_child_storage_key(key.as_slice()) {
260 child_roots.push((value, key));
261 } else {
262 self.imported_bytes += key.len() as u64;
263 entry.0.push((key, value))
264 }
265 }
266 for (root, storage_key) in child_roots {
267 self.state.entry(root).or_default().1.push(storage_key);
268 }
269 }
270 }
271 complete
272 };
273 if complete {
274 self.complete = true;
275 ImportResult::Import(
276 self.target_block,
277 self.target_header.clone(),
278 ImportedState {
279 block: self.target_block,
280 state: std::mem::take(&mut self.state).into(),
281 },
282 self.target_body.clone(),
283 self.target_justifications.clone(),
284 )
285 } else {
286 ImportResult::Continue
287 }
288 }
289
290 fn next_request(&self) -> StateRequest {
292 StateRequest {
293 block: self.target_block.encode(),
294 start: self.last_key.clone().into_vec(),
295 no_proof: self.skip_proof,
296 }
297 }
298
299 fn is_complete(&self) -> bool {
301 self.complete
302 }
303
304 fn target_number(&self) -> NumberFor<B> {
306 *self.target_header.number()
307 }
308
309 fn target_hash(&self) -> B::Hash {
311 self.target_block
312 }
313
314 fn progress(&self) -> StateSyncProgress {
316 let cursor = *self.last_key.get(0).and_then(|last| last.get(0)).unwrap_or(&0u8);
317 let percent_done = cursor as u32 * 100 / 256;
318 StateSyncProgress {
319 percentage: percent_done,
320 size: self.imported_bytes,
321 phase: if self.complete {
322 StateSyncPhase::ImportingState
323 } else {
324 StateSyncPhase::DownloadingState
325 },
326 }
327 }
328}