1use {
2 crate::{
3 accounts::Accounts,
4 accounts_db::{
5 AccountShrinkThreshold, AccountStorageEntry, AccountsDb, AccountsDbConfig, AppendVecId,
6 AtomicAppendVecId, BankHashInfo, IndexGenerationInfo, SnapshotStorage,
7 },
8 accounts_index::AccountSecondaryIndexes,
9 accounts_update_notifier_interface::AccountsUpdateNotifier,
10 append_vec::{AppendVec, StoredMetaWriteVersion},
11 bank::{Bank, BankFieldsToDeserialize, BankIncrementalSnapshotPersistence, BankRc},
12 blockhash_queue::BlockhashQueue,
13 builtins::Builtins,
14 epoch_stakes::EpochStakes,
15 hardened_unpack::UnpackedAppendVecMap,
16 rent_collector::RentCollector,
17 serde_snapshot::storage::SerializableAccountStorageEntry,
18 snapshot_utils::{self, BANK_SNAPSHOT_PRE_FILENAME_EXTENSION},
19 stakes::Stakes,
20 },
21 bincode::{self, config::Options, Error},
22 log::*,
23 rayon::prelude::*,
24 serde::{de::DeserializeOwned, Deserialize, Serialize},
25 safecoin_measure::{measure, measure::Measure},
26 solana_sdk::{
27 clock::{Epoch, Slot, UnixTimestamp},
28 deserialize_utils::default_on_eof,
29 epoch_schedule::EpochSchedule,
30 fee_calculator::{FeeCalculator, FeeRateGovernor},
31 genesis_config::GenesisConfig,
32 hard_forks::HardForks,
33 hash::Hash,
34 inflation::Inflation,
35 pubkey::Pubkey,
36 },
37 std::{
38 collections::{HashMap, HashSet},
39 io::{self, BufReader, BufWriter, Read, Write},
40 path::{Path, PathBuf},
41 result::Result,
42 sync::{
43 atomic::{AtomicUsize, Ordering},
44 Arc, RwLock,
45 },
46 thread::Builder,
47 },
48 storage::{SerializableStorage, SerializedAppendVecId},
49};
50
51mod newer;
52mod storage;
53mod tests;
54mod utils;
55
56#[cfg(test)]
58pub(crate) use tests::reconstruct_accounts_db_via_serialization;
59
60#[derive(Copy, Clone, Eq, PartialEq)]
61pub(crate) enum SerdeStyle {
62 Newer,
63}
64
65const MAX_STREAM_SIZE: u64 = 32 * 1024 * 1024 * 1024;
66
67#[derive(Clone, Debug, Default, Deserialize, Serialize, AbiExample, PartialEq)]
68pub struct AccountsDbFields<T>(
69 HashMap<Slot, Vec<T>>,
70 StoredMetaWriteVersion,
71 Slot,
72 BankHashInfo,
73 #[serde(deserialize_with = "default_on_eof")]
75 Vec<Slot>,
76 #[serde(deserialize_with = "default_on_eof")]
78 Vec<(Slot, Hash)>,
79 );
81
82pub struct SnapshotStreams<'a, R> {
85 pub full_snapshot_stream: &'a mut BufReader<R>,
86 pub incremental_snapshot_stream: Option<&'a mut BufReader<R>>,
87}
88
89#[derive(Debug)]
92pub struct SnapshotAccountsDbFields<T> {
93 full_snapshot_accounts_db_fields: AccountsDbFields<T>,
94 incremental_snapshot_accounts_db_fields: Option<AccountsDbFields<T>>,
95}
96
97impl<T> SnapshotAccountsDbFields<T> {
98 fn collapse_into(self) -> Result<AccountsDbFields<T>, Error> {
103 match self.incremental_snapshot_accounts_db_fields {
104 None => Ok(self.full_snapshot_accounts_db_fields),
105 Some(AccountsDbFields(
106 mut incremental_snapshot_storages,
107 incremental_snapshot_version,
108 incremental_snapshot_slot,
109 incremental_snapshot_bank_hash_info,
110 incremental_snapshot_historical_roots,
111 incremental_snapshot_historical_roots_with_hash,
112 )) => {
113 let full_snapshot_storages = self.full_snapshot_accounts_db_fields.0;
114 let full_snapshot_slot = self.full_snapshot_accounts_db_fields.2;
115
116 incremental_snapshot_storages.retain(|slot, _| *slot > full_snapshot_slot);
118
119 incremental_snapshot_storages
121 .iter()
122 .all(|storage_entry| !full_snapshot_storages.contains_key(storage_entry.0)).then(|| ()).ok_or_else(|| {
123 io::Error::new(io::ErrorKind::InvalidData, "Snapshots are incompatible: There are storages for the same slot in both the full snapshot and the incremental snapshot!")
124 })?;
125
126 let mut combined_storages = full_snapshot_storages;
127 combined_storages.extend(incremental_snapshot_storages.into_iter());
128
129 Ok(AccountsDbFields(
130 combined_storages,
131 incremental_snapshot_version,
132 incremental_snapshot_slot,
133 incremental_snapshot_bank_hash_info,
134 incremental_snapshot_historical_roots,
135 incremental_snapshot_historical_roots_with_hash,
136 ))
137 }
138 }
139 }
140}
141
142trait TypeContext<'a>: PartialEq {
143 type SerializableAccountStorageEntry: Serialize
144 + DeserializeOwned
145 + From<&'a AccountStorageEntry>
146 + SerializableStorage
147 + Sync;
148
149 fn serialize_bank_and_storage<S: serde::ser::Serializer>(
150 serializer: S,
151 serializable_bank: &SerializableBankAndStorage<'a, Self>,
152 ) -> std::result::Result<S::Ok, S::Error>
153 where
154 Self: std::marker::Sized;
155
156 #[cfg(test)]
157 fn serialize_bank_and_storage_without_extra_fields<S: serde::ser::Serializer>(
158 serializer: S,
159 serializable_bank: &SerializableBankAndStorageNoExtra<'a, Self>,
160 ) -> std::result::Result<S::Ok, S::Error>
161 where
162 Self: std::marker::Sized;
163
164 fn serialize_accounts_db_fields<S: serde::ser::Serializer>(
165 serializer: S,
166 serializable_db: &SerializableAccountsDb<'a, Self>,
167 ) -> std::result::Result<S::Ok, S::Error>
168 where
169 Self: std::marker::Sized;
170
171 fn deserialize_bank_fields<R>(
172 stream: &mut BufReader<R>,
173 ) -> Result<
174 (
175 BankFieldsToDeserialize,
176 AccountsDbFields<Self::SerializableAccountStorageEntry>,
177 ),
178 Error,
179 >
180 where
181 R: Read;
182
183 fn deserialize_accounts_db_fields<R>(
184 stream: &mut BufReader<R>,
185 ) -> Result<AccountsDbFields<Self::SerializableAccountStorageEntry>, Error>
186 where
187 R: Read;
188
189 fn reserialize_bank_fields_with_hash<R, W>(
193 stream_reader: &mut BufReader<R>,
194 stream_writer: &mut BufWriter<W>,
195 accounts_hash: &Hash,
196 incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
197 ) -> std::result::Result<(), Box<bincode::ErrorKind>>
198 where
199 R: Read,
200 W: Write;
201}
202
203fn deserialize_from<R, T>(reader: R) -> bincode::Result<T>
204where
205 R: Read,
206 T: DeserializeOwned,
207{
208 bincode::options()
209 .with_limit(MAX_STREAM_SIZE)
210 .with_fixint_encoding()
211 .allow_trailing_bytes()
212 .deserialize_from::<R, T>(reader)
213}
214
215pub(crate) fn compare_two_serialized_banks(
218 path1: impl AsRef<Path>,
219 path2: impl AsRef<Path>,
220) -> std::result::Result<bool, Error> {
221 use std::fs::File;
222 let file1 = File::open(path1)?;
223 let mut stream1 = BufReader::new(file1);
224 let file2 = File::open(path2)?;
225 let mut stream2 = BufReader::new(file2);
226
227 let fields1 = newer::Context::deserialize_bank_fields(&mut stream1)?;
228 let fields2 = newer::Context::deserialize_bank_fields(&mut stream2)?;
229 Ok(fields1 == fields2)
230}
231
232pub(crate) fn fields_from_stream<R: Read>(
233 serde_style: SerdeStyle,
234 snapshot_stream: &mut BufReader<R>,
235) -> std::result::Result<
236 (
237 BankFieldsToDeserialize,
238 AccountsDbFields<SerializableAccountStorageEntry>,
239 ),
240 Error,
241> {
242 match serde_style {
243 SerdeStyle::Newer => newer::Context::deserialize_bank_fields(snapshot_stream),
244 }
245}
246
247pub(crate) fn fields_from_streams<R: Read>(
248 serde_style: SerdeStyle,
249 snapshot_streams: &mut SnapshotStreams<R>,
250) -> std::result::Result<
251 (
252 BankFieldsToDeserialize,
253 SnapshotAccountsDbFields<SerializableAccountStorageEntry>,
254 ),
255 Error,
256> {
257 let (full_snapshot_bank_fields, full_snapshot_accounts_db_fields) =
258 fields_from_stream(serde_style, snapshot_streams.full_snapshot_stream)?;
259 let incremental_fields = snapshot_streams
260 .incremental_snapshot_stream
261 .as_mut()
262 .map(|stream| fields_from_stream(serde_style, stream))
263 .transpose()?;
264
265 let (incremental_snapshot_bank_fields, incremental_snapshot_accounts_db_fields) =
267 if let Some((bank_fields, accounts_fields)) = incremental_fields {
268 (Some(bank_fields), Some(accounts_fields))
269 } else {
270 (None, None)
271 };
272
273 let snapshot_accounts_db_fields = SnapshotAccountsDbFields {
274 full_snapshot_accounts_db_fields,
275 incremental_snapshot_accounts_db_fields,
276 };
277 Ok((
278 incremental_snapshot_bank_fields.unwrap_or(full_snapshot_bank_fields),
279 snapshot_accounts_db_fields,
280 ))
281}
282
283#[allow(clippy::too_many_arguments)]
284pub(crate) fn bank_from_streams<R>(
285 serde_style: SerdeStyle,
286 snapshot_streams: &mut SnapshotStreams<R>,
287 account_paths: &[PathBuf],
288 unpacked_append_vec_map: UnpackedAppendVecMap,
289 genesis_config: &GenesisConfig,
290 debug_keys: Option<Arc<HashSet<Pubkey>>>,
291 additional_builtins: Option<&Builtins>,
292 account_secondary_indexes: AccountSecondaryIndexes,
293 caching_enabled: bool,
294 limit_load_slot_count_from_snapshot: Option<usize>,
295 shrink_ratio: AccountShrinkThreshold,
296 verify_index: bool,
297 accounts_db_config: Option<AccountsDbConfig>,
298 accounts_update_notifier: Option<AccountsUpdateNotifier>,
299) -> std::result::Result<Bank, Error>
300where
301 R: Read,
302{
303 let (bank_fields, accounts_db_fields) = fields_from_streams(serde_style, snapshot_streams)?;
304 reconstruct_bank_from_fields(
305 bank_fields,
306 accounts_db_fields,
307 genesis_config,
308 account_paths,
309 unpacked_append_vec_map,
310 debug_keys,
311 additional_builtins,
312 account_secondary_indexes,
313 caching_enabled,
314 limit_load_slot_count_from_snapshot,
315 shrink_ratio,
316 verify_index,
317 accounts_db_config,
318 accounts_update_notifier,
319 )
320}
321
322pub(crate) fn bank_to_stream<W>(
323 serde_style: SerdeStyle,
324 stream: &mut BufWriter<W>,
325 bank: &Bank,
326 snapshot_storages: &[SnapshotStorage],
327) -> Result<(), Error>
328where
329 W: Write,
330{
331 match serde_style {
332 SerdeStyle::Newer => bincode::serialize_into(
333 stream,
334 &SerializableBankAndStorage::<newer::Context> {
335 bank,
336 snapshot_storages,
337 phantom: std::marker::PhantomData::default(),
338 },
339 ),
340 }
341}
342
343#[cfg(test)]
344pub(crate) fn bank_to_stream_no_extra_fields<W>(
345 serde_style: SerdeStyle,
346 stream: &mut BufWriter<W>,
347 bank: &Bank,
348 snapshot_storages: &[SnapshotStorage],
349) -> Result<(), Error>
350where
351 W: Write,
352{
353 match serde_style {
354 SerdeStyle::Newer => bincode::serialize_into(
355 stream,
356 &SerializableBankAndStorageNoExtra::<newer::Context> {
357 bank,
358 snapshot_storages,
359 phantom: std::marker::PhantomData::default(),
360 },
361 ),
362 }
363}
364
365fn reserialize_bank_fields_with_new_hash<W, R>(
369 stream_reader: &mut BufReader<R>,
370 stream_writer: &mut BufWriter<W>,
371 accounts_hash: &Hash,
372 incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
373) -> Result<(), Error>
374where
375 W: Write,
376 R: Read,
377{
378 newer::Context::reserialize_bank_fields_with_hash(
379 stream_reader,
380 stream_writer,
381 accounts_hash,
382 incremental_snapshot_persistence,
383 )
384}
385
386pub fn reserialize_bank_with_new_accounts_hash(
392 bank_snapshots_dir: impl AsRef<Path>,
393 slot: Slot,
394 accounts_hash: &Hash,
395 incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
396) -> bool {
397 let bank_post = snapshot_utils::get_bank_snapshots_dir(bank_snapshots_dir, slot);
398 let bank_post = bank_post.join(snapshot_utils::get_snapshot_file_name(slot));
399 let mut bank_pre = bank_post.clone();
400 bank_pre.set_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
401
402 let mut found = false;
403 {
404 let file = std::fs::File::open(&bank_pre);
405 if let Ok(file) = file {
407 found = true;
408 let file_out = std::fs::File::create(bank_post).unwrap();
409 reserialize_bank_fields_with_new_hash(
410 &mut BufReader::new(file),
411 &mut BufWriter::new(file_out),
412 accounts_hash,
413 incremental_snapshot_persistence,
414 )
415 .unwrap();
416 }
417 }
418 if found {
419 std::fs::remove_file(bank_pre).unwrap();
420 }
421 found
422}
423
424struct SerializableBankAndStorage<'a, C> {
425 bank: &'a Bank,
426 snapshot_storages: &'a [SnapshotStorage],
427 phantom: std::marker::PhantomData<C>,
428}
429
430impl<'a, C: TypeContext<'a>> Serialize for SerializableBankAndStorage<'a, C> {
431 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
432 where
433 S: serde::ser::Serializer,
434 {
435 C::serialize_bank_and_storage(serializer, self)
436 }
437}
438
439#[cfg(test)]
440struct SerializableBankAndStorageNoExtra<'a, C> {
441 bank: &'a Bank,
442 snapshot_storages: &'a [SnapshotStorage],
443 phantom: std::marker::PhantomData<C>,
444}
445
446#[cfg(test)]
447impl<'a, C: TypeContext<'a>> Serialize for SerializableBankAndStorageNoExtra<'a, C> {
448 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
449 where
450 S: serde::ser::Serializer,
451 {
452 C::serialize_bank_and_storage_without_extra_fields(serializer, self)
453 }
454}
455
456#[cfg(test)]
457impl<'a, C> From<SerializableBankAndStorageNoExtra<'a, C>> for SerializableBankAndStorage<'a, C> {
458 fn from(s: SerializableBankAndStorageNoExtra<'a, C>) -> SerializableBankAndStorage<'a, C> {
459 let SerializableBankAndStorageNoExtra {
460 bank,
461 snapshot_storages,
462 phantom,
463 } = s;
464 SerializableBankAndStorage {
465 bank,
466 snapshot_storages,
467 phantom,
468 }
469 }
470}
471
472struct SerializableAccountsDb<'a, C> {
473 accounts_db: &'a AccountsDb,
474 slot: Slot,
475 account_storage_entries: &'a [SnapshotStorage],
476 phantom: std::marker::PhantomData<C>,
477}
478
479impl<'a, C: TypeContext<'a>> Serialize for SerializableAccountsDb<'a, C> {
480 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
481 where
482 S: serde::ser::Serializer,
483 {
484 C::serialize_accounts_db_fields(serializer, self)
485 }
486}
487
488#[cfg(RUSTC_WITH_SPECIALIZATION)]
489impl<'a, C> safecoin_frozen_abi::abi_example::IgnoreAsHelper for SerializableAccountsDb<'a, C> {}
490
491#[allow(clippy::too_many_arguments)]
492fn reconstruct_bank_from_fields<E>(
493 bank_fields: BankFieldsToDeserialize,
494 snapshot_accounts_db_fields: SnapshotAccountsDbFields<E>,
495 genesis_config: &GenesisConfig,
496 account_paths: &[PathBuf],
497 unpacked_append_vec_map: UnpackedAppendVecMap,
498 debug_keys: Option<Arc<HashSet<Pubkey>>>,
499 additional_builtins: Option<&Builtins>,
500 account_secondary_indexes: AccountSecondaryIndexes,
501 caching_enabled: bool,
502 limit_load_slot_count_from_snapshot: Option<usize>,
503 shrink_ratio: AccountShrinkThreshold,
504 verify_index: bool,
505 accounts_db_config: Option<AccountsDbConfig>,
506 accounts_update_notifier: Option<AccountsUpdateNotifier>,
507) -> Result<Bank, Error>
508where
509 E: SerializableStorage + std::marker::Sync,
510{
511 let (accounts_db, reconstructed_accounts_db_info) = reconstruct_accountsdb_from_fields(
512 snapshot_accounts_db_fields,
513 account_paths,
514 unpacked_append_vec_map,
515 genesis_config,
516 account_secondary_indexes,
517 caching_enabled,
518 limit_load_slot_count_from_snapshot,
519 shrink_ratio,
520 verify_index,
521 accounts_db_config,
522 accounts_update_notifier,
523 )?;
524
525 let bank_rc = BankRc::new(Accounts::new_empty(accounts_db), bank_fields.slot);
526
527 let debug_do_not_add_builtins = limit_load_slot_count_from_snapshot.is_some();
529 let bank = Bank::new_from_fields(
530 bank_rc,
531 genesis_config,
532 bank_fields,
533 debug_keys,
534 additional_builtins,
535 debug_do_not_add_builtins,
536 reconstructed_accounts_db_info.accounts_data_len,
537 );
538
539 info!("rent_collector: {:?}", bank.rent_collector());
540
541 Ok(bank)
542}
543
544fn reconstruct_single_storage(
545 slot: &Slot,
546 append_vec_path: &Path,
547 current_len: usize,
548 append_vec_id: AppendVecId,
549) -> io::Result<Arc<AccountStorageEntry>> {
550 let (accounts, num_accounts) = AppendVec::new_from_file(append_vec_path, current_len)?;
551 Ok(Arc::new(AccountStorageEntry::new_existing(
552 *slot,
553 append_vec_id,
554 accounts,
555 num_accounts,
556 )))
557}
558
559fn remap_append_vec_file(
560 slot: Slot,
561 old_append_vec_id: SerializedAppendVecId,
562 append_vec_path: &Path,
563 next_append_vec_id: &AtomicAppendVecId,
564 num_collisions: &AtomicUsize,
565) -> io::Result<(AppendVecId, PathBuf)> {
566 let (remapped_append_vec_id, remapped_append_vec_path) = loop {
569 let remapped_append_vec_id = next_append_vec_id.fetch_add(1, Ordering::AcqRel);
570 let remapped_file_name = AppendVec::file_name(slot, remapped_append_vec_id);
571 let remapped_append_vec_path = append_vec_path.parent().unwrap().join(&remapped_file_name);
572
573 if old_append_vec_id == remapped_append_vec_id as SerializedAppendVecId
581 || std::fs::metadata(&remapped_append_vec_path).is_err()
582 {
583 break (remapped_append_vec_id, remapped_append_vec_path);
584 }
585
586 num_collisions.fetch_add(1, Ordering::Relaxed);
589 };
590 if old_append_vec_id != remapped_append_vec_id as SerializedAppendVecId {
592 std::fs::rename(append_vec_path, &remapped_append_vec_path)?;
593 }
594
595 Ok((remapped_append_vec_id, remapped_append_vec_path))
596}
597
598fn remap_and_reconstruct_single_storage(
599 slot: Slot,
600 old_append_vec_id: SerializedAppendVecId,
601 current_len: usize,
602 append_vec_path: &Path,
603 next_append_vec_id: &AtomicAppendVecId,
604 num_collisions: &AtomicUsize,
605) -> io::Result<Arc<AccountStorageEntry>> {
606 let (remapped_append_vec_id, remapped_append_vec_path) = remap_append_vec_file(
607 slot,
608 old_append_vec_id,
609 append_vec_path,
610 next_append_vec_id,
611 num_collisions,
612 )?;
613 let storage = reconstruct_single_storage(
614 &slot,
615 &remapped_append_vec_path,
616 current_len,
617 remapped_append_vec_id,
618 )?;
619 Ok(storage)
620}
621
622fn remap_and_reconstruct_slot_storage<E>(
623 slot: Slot,
624 slot_storage: &[E],
625 unpacked_append_vec_map: &UnpackedAppendVecMap,
626 next_append_vec_id: &AtomicAppendVecId,
627 num_collisions: &AtomicUsize,
628) -> Result<HashMap<AppendVecId, Arc<AccountStorageEntry>>, Error>
629where
630 E: SerializableStorage,
631{
632 slot_storage
633 .iter()
634 .map(|storage_entry| {
635 let file_name = AppendVec::file_name(slot, storage_entry.id());
636 let append_vec_path = unpacked_append_vec_map.get(&file_name).ok_or_else(|| {
637 io::Error::new(
638 io::ErrorKind::NotFound,
639 format!("{} not found in unpacked append vecs", file_name),
640 )
641 })?;
642
643 let new_storage_entry = remap_and_reconstruct_single_storage(
644 slot,
645 storage_entry.id(),
646 storage_entry.current_len(),
647 append_vec_path,
648 next_append_vec_id,
649 num_collisions,
650 )?;
651 Ok((new_storage_entry.append_vec_id(), new_storage_entry))
652 })
653 .collect::<Result<HashMap<AppendVecId, _>, Error>>()
654}
655
656fn remap_and_reconstruct_storages<E>(
657 snapshot_storages: Vec<(Slot, Vec<E>)>,
658 unpacked_append_vec_map: &UnpackedAppendVecMap,
659 next_append_vec_id: &AtomicAppendVecId,
660 num_collisions: &AtomicUsize,
661) -> Result<HashMap<Slot, HashMap<AppendVecId, Arc<AccountStorageEntry>>>, Error>
662where
663 E: SerializableStorage + std::marker::Sync,
664{
665 snapshot_storages
666 .into_par_iter()
667 .map(|(slot, slot_storage)| {
668 Ok((
669 *slot,
670 remap_and_reconstruct_slot_storage(
671 *slot,
672 slot_storage,
673 unpacked_append_vec_map,
674 next_append_vec_id,
675 num_collisions,
676 )?,
677 ))
678 })
679 .collect::<Result<HashMap<Slot, _>, Error>>()
680}
681
682#[derive(Debug, Default, Copy, Clone)]
684struct ReconstructedAccountsDbInfo {
685 accounts_data_len: u64,
686}
687
688#[allow(clippy::too_many_arguments)]
689fn reconstruct_accountsdb_from_fields<E>(
690 snapshot_accounts_db_fields: SnapshotAccountsDbFields<E>,
691 account_paths: &[PathBuf],
692 unpacked_append_vec_map: UnpackedAppendVecMap,
693 genesis_config: &GenesisConfig,
694 account_secondary_indexes: AccountSecondaryIndexes,
695 caching_enabled: bool,
696 limit_load_slot_count_from_snapshot: Option<usize>,
697 shrink_ratio: AccountShrinkThreshold,
698 verify_index: bool,
699 accounts_db_config: Option<AccountsDbConfig>,
700 accounts_update_notifier: Option<AccountsUpdateNotifier>,
701) -> Result<(AccountsDb, ReconstructedAccountsDbInfo), Error>
702where
703 E: SerializableStorage + std::marker::Sync,
704{
705 let mut accounts_db = AccountsDb::new_with_config(
706 account_paths.to_vec(),
707 &genesis_config.cluster_type,
708 account_secondary_indexes,
709 caching_enabled,
710 shrink_ratio,
711 accounts_db_config,
712 accounts_update_notifier,
713 );
714
715 let AccountsDbFields(
716 snapshot_storages,
717 snapshot_version,
718 snapshot_slot,
719 snapshot_bank_hash_info,
720 snapshot_historical_roots,
721 snapshot_historical_roots_with_hash,
722 ) = snapshot_accounts_db_fields.collapse_into()?;
723
724 let snapshot_storages = snapshot_storages.into_iter().collect::<Vec<_>>();
725
726 for path in &accounts_db.paths {
728 std::fs::create_dir_all(path)
729 .unwrap_or_else(|err| panic!("Failed to create directory {}: {}", path.display(), err));
730 }
731
732 reconstruct_historical_roots(
733 &accounts_db,
734 snapshot_historical_roots,
735 snapshot_historical_roots_with_hash,
736 );
737
738 let num_collisions = AtomicUsize::new(0);
740 let next_append_vec_id = AtomicAppendVecId::new(0);
741 let (mut storage, measure_remap) = measure!(remap_and_reconstruct_storages(
742 snapshot_storages,
743 &unpacked_append_vec_map,
744 &next_append_vec_id,
745 &num_collisions
746 )?);
747
748 storage.retain(|_slot, stores| !stores.is_empty());
752 assert!(
753 !storage.is_empty(),
754 "At least one storage entry must exist from deserializing stream"
755 );
756
757 let next_append_vec_id = next_append_vec_id.load(Ordering::Acquire);
758 let max_append_vec_id = next_append_vec_id - 1;
759 assert!(
760 max_append_vec_id <= AppendVecId::MAX / 2,
761 "Storage id {} larger than allowed max",
762 max_append_vec_id
763 );
764
765 accounts_db
767 .bank_hashes
768 .write()
769 .unwrap()
770 .insert(snapshot_slot, snapshot_bank_hash_info);
771 accounts_db.storage.map.extend(
772 storage
773 .into_iter()
774 .map(|(slot, slot_storage_entry)| (slot, Arc::new(RwLock::new(slot_storage_entry)))),
775 );
776 accounts_db
777 .next_id
778 .store(next_append_vec_id, Ordering::Release);
779 accounts_db
780 .write_version
781 .fetch_add(snapshot_version, Ordering::Release);
782
783 let mut measure_notify = Measure::start("accounts_notify");
784
785 let accounts_db = Arc::new(accounts_db);
786 let accounts_db_clone = accounts_db.clone();
787 let handle = Builder::new()
788 .name("solNfyAccRestor".to_string())
789 .spawn(move || {
790 accounts_db_clone.notify_account_restore_from_snapshot();
791 })
792 .unwrap();
793
794 let IndexGenerationInfo {
795 accounts_data_len,
796 rent_paying_accounts_by_partition,
797 } = accounts_db.generate_index(
798 limit_load_slot_count_from_snapshot,
799 verify_index,
800 genesis_config,
801 );
802 accounts_db
803 .accounts_index
804 .rent_paying_accounts_by_partition
805 .set(rent_paying_accounts_by_partition)
806 .unwrap();
807
808 accounts_db.maybe_add_filler_accounts(
809 &genesis_config.epoch_schedule,
810 &genesis_config.rent,
811 snapshot_slot,
812 );
813
814 handle.join().unwrap();
815 measure_notify.stop();
816
817 datapoint_info!(
818 "reconstruct_accountsdb_from_fields()",
819 ("remap-time-us", measure_remap.as_us(), i64),
820 (
821 "remap-collisions",
822 num_collisions.load(Ordering::Relaxed),
823 i64
824 ),
825 ("accountsdb-notify-at-start-us", measure_notify.as_us(), i64),
826 );
827
828 Ok((
829 Arc::try_unwrap(accounts_db).unwrap(),
830 ReconstructedAccountsDbInfo { accounts_data_len },
831 ))
832}
833
834fn reconstruct_historical_roots(
836 accounts_db: &AccountsDb,
837 mut snapshot_historical_roots: Vec<Slot>,
838 snapshot_historical_roots_with_hash: Vec<(Slot, Hash)>,
839) {
840 snapshot_historical_roots.extend(
845 snapshot_historical_roots_with_hash
846 .into_iter()
847 .map(|(root, _)| root),
848 );
849 snapshot_historical_roots.sort_unstable();
850 let mut roots_tracker = accounts_db.accounts_index.roots_tracker.write().unwrap();
851 snapshot_historical_roots.into_iter().for_each(|root| {
852 roots_tracker.historical_roots.insert(root);
853 });
854}