solana_runtime/
serde_snapshot.rs

1use {
2    crate::{
3        accounts::Accounts,
4        accounts_db::{
5            AccountShrinkThreshold, AccountStorageEntry, AccountsDb, AccountsDbConfig, AppendVecId,
6            AtomicAppendVecId, BankHashInfo, IndexGenerationInfo, SnapshotStorage,
7        },
8        accounts_index::AccountSecondaryIndexes,
9        accounts_update_notifier_interface::AccountsUpdateNotifier,
10        append_vec::{AppendVec, StoredMetaWriteVersion},
11        bank::{Bank, BankFieldsToDeserialize, BankIncrementalSnapshotPersistence, BankRc},
12        blockhash_queue::BlockhashQueue,
13        builtins::Builtins,
14        epoch_stakes::EpochStakes,
15        hardened_unpack::UnpackedAppendVecMap,
16        rent_collector::RentCollector,
17        serde_snapshot::storage::SerializableAccountStorageEntry,
18        snapshot_utils::{self, BANK_SNAPSHOT_PRE_FILENAME_EXTENSION},
19        stakes::Stakes,
20    },
21    bincode::{self, config::Options, Error},
22    log::*,
23    rayon::prelude::*,
24    serde::{de::DeserializeOwned, Deserialize, Serialize},
25    safecoin_measure::{measure, measure::Measure},
26    solana_sdk::{
27        clock::{Epoch, Slot, UnixTimestamp},
28        deserialize_utils::default_on_eof,
29        epoch_schedule::EpochSchedule,
30        fee_calculator::{FeeCalculator, FeeRateGovernor},
31        genesis_config::GenesisConfig,
32        hard_forks::HardForks,
33        hash::Hash,
34        inflation::Inflation,
35        pubkey::Pubkey,
36    },
37    std::{
38        collections::{HashMap, HashSet},
39        io::{self, BufReader, BufWriter, Read, Write},
40        path::{Path, PathBuf},
41        result::Result,
42        sync::{
43            atomic::{AtomicUsize, Ordering},
44            Arc, RwLock,
45        },
46        thread::Builder,
47    },
48    storage::{SerializableStorage, SerializedAppendVecId},
49};
50
51mod newer;
52mod storage;
53mod tests;
54mod utils;
55
56// a number of test cases in accounts_db use this
57#[cfg(test)]
58pub(crate) use tests::reconstruct_accounts_db_via_serialization;
59
60#[derive(Copy, Clone, Eq, PartialEq)]
61pub(crate) enum SerdeStyle {
62    Newer,
63}
64
65const MAX_STREAM_SIZE: u64 = 32 * 1024 * 1024 * 1024;
66
67#[derive(Clone, Debug, Default, Deserialize, Serialize, AbiExample, PartialEq)]
68pub struct AccountsDbFields<T>(
69    HashMap<Slot, Vec<T>>,
70    StoredMetaWriteVersion,
71    Slot,
72    BankHashInfo,
73    /// all slots that were roots within the last epoch
74    #[serde(deserialize_with = "default_on_eof")]
75    Vec<Slot>,
76    /// slots that were roots within the last epoch for which we care about the hash value
77    #[serde(deserialize_with = "default_on_eof")]
78    Vec<(Slot, Hash)>,
79    // here?
80);
81
82/// Helper type to wrap BufReader streams when deserializing and reconstructing from either just a
83/// full snapshot, or both a full and incremental snapshot
84pub struct SnapshotStreams<'a, R> {
85    pub full_snapshot_stream: &'a mut BufReader<R>,
86    pub incremental_snapshot_stream: Option<&'a mut BufReader<R>>,
87}
88
89/// Helper type to wrap AccountsDbFields when reconstructing AccountsDb from either just a full
90/// snapshot, or both a full and incremental snapshot
91#[derive(Debug)]
92pub struct SnapshotAccountsDbFields<T> {
93    full_snapshot_accounts_db_fields: AccountsDbFields<T>,
94    incremental_snapshot_accounts_db_fields: Option<AccountsDbFields<T>>,
95}
96
97impl<T> SnapshotAccountsDbFields<T> {
98    /// Collapse the SnapshotAccountsDbFields into a single AccountsDbFields.  If there is no
99    /// incremental snapshot, this returns the AccountsDbFields from the full snapshot.
100    /// Otherwise, use the AccountsDbFields from the incremental snapshot, and a combination
101    /// of the storages from both the full and incremental snapshots.
102    fn collapse_into(self) -> Result<AccountsDbFields<T>, Error> {
103        match self.incremental_snapshot_accounts_db_fields {
104            None => Ok(self.full_snapshot_accounts_db_fields),
105            Some(AccountsDbFields(
106                mut incremental_snapshot_storages,
107                incremental_snapshot_version,
108                incremental_snapshot_slot,
109                incremental_snapshot_bank_hash_info,
110                incremental_snapshot_historical_roots,
111                incremental_snapshot_historical_roots_with_hash,
112            )) => {
113                let full_snapshot_storages = self.full_snapshot_accounts_db_fields.0;
114                let full_snapshot_slot = self.full_snapshot_accounts_db_fields.2;
115
116                // filter out incremental snapshot storages with slot <= full snapshot slot
117                incremental_snapshot_storages.retain(|slot, _| *slot > full_snapshot_slot);
118
119                // There must not be any overlap in the slots of storages between the full snapshot and the incremental snapshot
120                incremental_snapshot_storages
121                    .iter()
122                    .all(|storage_entry| !full_snapshot_storages.contains_key(storage_entry.0)).then(|| ()).ok_or_else(|| {
123                        io::Error::new(io::ErrorKind::InvalidData, "Snapshots are incompatible: There are storages for the same slot in both the full snapshot and the incremental snapshot!")
124                    })?;
125
126                let mut combined_storages = full_snapshot_storages;
127                combined_storages.extend(incremental_snapshot_storages.into_iter());
128
129                Ok(AccountsDbFields(
130                    combined_storages,
131                    incremental_snapshot_version,
132                    incremental_snapshot_slot,
133                    incremental_snapshot_bank_hash_info,
134                    incremental_snapshot_historical_roots,
135                    incremental_snapshot_historical_roots_with_hash,
136                ))
137            }
138        }
139    }
140}
141
142trait TypeContext<'a>: PartialEq {
143    type SerializableAccountStorageEntry: Serialize
144        + DeserializeOwned
145        + From<&'a AccountStorageEntry>
146        + SerializableStorage
147        + Sync;
148
149    fn serialize_bank_and_storage<S: serde::ser::Serializer>(
150        serializer: S,
151        serializable_bank: &SerializableBankAndStorage<'a, Self>,
152    ) -> std::result::Result<S::Ok, S::Error>
153    where
154        Self: std::marker::Sized;
155
156    #[cfg(test)]
157    fn serialize_bank_and_storage_without_extra_fields<S: serde::ser::Serializer>(
158        serializer: S,
159        serializable_bank: &SerializableBankAndStorageNoExtra<'a, Self>,
160    ) -> std::result::Result<S::Ok, S::Error>
161    where
162        Self: std::marker::Sized;
163
164    fn serialize_accounts_db_fields<S: serde::ser::Serializer>(
165        serializer: S,
166        serializable_db: &SerializableAccountsDb<'a, Self>,
167    ) -> std::result::Result<S::Ok, S::Error>
168    where
169        Self: std::marker::Sized;
170
171    fn deserialize_bank_fields<R>(
172        stream: &mut BufReader<R>,
173    ) -> Result<
174        (
175            BankFieldsToDeserialize,
176            AccountsDbFields<Self::SerializableAccountStorageEntry>,
177        ),
178        Error,
179    >
180    where
181        R: Read;
182
183    fn deserialize_accounts_db_fields<R>(
184        stream: &mut BufReader<R>,
185    ) -> Result<AccountsDbFields<Self::SerializableAccountStorageEntry>, Error>
186    where
187        R: Read;
188
189    /// deserialize the bank from 'stream_reader'
190    /// modify the accounts_hash
191    /// reserialize the bank to 'stream_writer'
192    fn reserialize_bank_fields_with_hash<R, W>(
193        stream_reader: &mut BufReader<R>,
194        stream_writer: &mut BufWriter<W>,
195        accounts_hash: &Hash,
196        incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
197    ) -> std::result::Result<(), Box<bincode::ErrorKind>>
198    where
199        R: Read,
200        W: Write;
201}
202
203fn deserialize_from<R, T>(reader: R) -> bincode::Result<T>
204where
205    R: Read,
206    T: DeserializeOwned,
207{
208    bincode::options()
209        .with_limit(MAX_STREAM_SIZE)
210        .with_fixint_encoding()
211        .allow_trailing_bytes()
212        .deserialize_from::<R, T>(reader)
213}
214
215/// used by tests to compare contents of serialized bank fields
216/// serialized format is not deterministic - likely due to randomness in structs like hashmaps
217pub(crate) fn compare_two_serialized_banks(
218    path1: impl AsRef<Path>,
219    path2: impl AsRef<Path>,
220) -> std::result::Result<bool, Error> {
221    use std::fs::File;
222    let file1 = File::open(path1)?;
223    let mut stream1 = BufReader::new(file1);
224    let file2 = File::open(path2)?;
225    let mut stream2 = BufReader::new(file2);
226
227    let fields1 = newer::Context::deserialize_bank_fields(&mut stream1)?;
228    let fields2 = newer::Context::deserialize_bank_fields(&mut stream2)?;
229    Ok(fields1 == fields2)
230}
231
232pub(crate) fn fields_from_stream<R: Read>(
233    serde_style: SerdeStyle,
234    snapshot_stream: &mut BufReader<R>,
235) -> std::result::Result<
236    (
237        BankFieldsToDeserialize,
238        AccountsDbFields<SerializableAccountStorageEntry>,
239    ),
240    Error,
241> {
242    match serde_style {
243        SerdeStyle::Newer => newer::Context::deserialize_bank_fields(snapshot_stream),
244    }
245}
246
247pub(crate) fn fields_from_streams<R: Read>(
248    serde_style: SerdeStyle,
249    snapshot_streams: &mut SnapshotStreams<R>,
250) -> std::result::Result<
251    (
252        BankFieldsToDeserialize,
253        SnapshotAccountsDbFields<SerializableAccountStorageEntry>,
254    ),
255    Error,
256> {
257    let (full_snapshot_bank_fields, full_snapshot_accounts_db_fields) =
258        fields_from_stream(serde_style, snapshot_streams.full_snapshot_stream)?;
259    let incremental_fields = snapshot_streams
260        .incremental_snapshot_stream
261        .as_mut()
262        .map(|stream| fields_from_stream(serde_style, stream))
263        .transpose()?;
264
265    // Option::unzip() not stabilized yet
266    let (incremental_snapshot_bank_fields, incremental_snapshot_accounts_db_fields) =
267        if let Some((bank_fields, accounts_fields)) = incremental_fields {
268            (Some(bank_fields), Some(accounts_fields))
269        } else {
270            (None, None)
271        };
272
273    let snapshot_accounts_db_fields = SnapshotAccountsDbFields {
274        full_snapshot_accounts_db_fields,
275        incremental_snapshot_accounts_db_fields,
276    };
277    Ok((
278        incremental_snapshot_bank_fields.unwrap_or(full_snapshot_bank_fields),
279        snapshot_accounts_db_fields,
280    ))
281}
282
283#[allow(clippy::too_many_arguments)]
284pub(crate) fn bank_from_streams<R>(
285    serde_style: SerdeStyle,
286    snapshot_streams: &mut SnapshotStreams<R>,
287    account_paths: &[PathBuf],
288    unpacked_append_vec_map: UnpackedAppendVecMap,
289    genesis_config: &GenesisConfig,
290    debug_keys: Option<Arc<HashSet<Pubkey>>>,
291    additional_builtins: Option<&Builtins>,
292    account_secondary_indexes: AccountSecondaryIndexes,
293    caching_enabled: bool,
294    limit_load_slot_count_from_snapshot: Option<usize>,
295    shrink_ratio: AccountShrinkThreshold,
296    verify_index: bool,
297    accounts_db_config: Option<AccountsDbConfig>,
298    accounts_update_notifier: Option<AccountsUpdateNotifier>,
299) -> std::result::Result<Bank, Error>
300where
301    R: Read,
302{
303    let (bank_fields, accounts_db_fields) = fields_from_streams(serde_style, snapshot_streams)?;
304    reconstruct_bank_from_fields(
305        bank_fields,
306        accounts_db_fields,
307        genesis_config,
308        account_paths,
309        unpacked_append_vec_map,
310        debug_keys,
311        additional_builtins,
312        account_secondary_indexes,
313        caching_enabled,
314        limit_load_slot_count_from_snapshot,
315        shrink_ratio,
316        verify_index,
317        accounts_db_config,
318        accounts_update_notifier,
319    )
320}
321
322pub(crate) fn bank_to_stream<W>(
323    serde_style: SerdeStyle,
324    stream: &mut BufWriter<W>,
325    bank: &Bank,
326    snapshot_storages: &[SnapshotStorage],
327) -> Result<(), Error>
328where
329    W: Write,
330{
331    match serde_style {
332        SerdeStyle::Newer => bincode::serialize_into(
333            stream,
334            &SerializableBankAndStorage::<newer::Context> {
335                bank,
336                snapshot_storages,
337                phantom: std::marker::PhantomData::default(),
338            },
339        ),
340    }
341}
342
343#[cfg(test)]
344pub(crate) fn bank_to_stream_no_extra_fields<W>(
345    serde_style: SerdeStyle,
346    stream: &mut BufWriter<W>,
347    bank: &Bank,
348    snapshot_storages: &[SnapshotStorage],
349) -> Result<(), Error>
350where
351    W: Write,
352{
353    match serde_style {
354        SerdeStyle::Newer => bincode::serialize_into(
355            stream,
356            &SerializableBankAndStorageNoExtra::<newer::Context> {
357                bank,
358                snapshot_storages,
359                phantom: std::marker::PhantomData::default(),
360            },
361        ),
362    }
363}
364
365/// deserialize the bank from 'stream_reader'
366/// modify the accounts_hash
367/// reserialize the bank to 'stream_writer'
368fn reserialize_bank_fields_with_new_hash<W, R>(
369    stream_reader: &mut BufReader<R>,
370    stream_writer: &mut BufWriter<W>,
371    accounts_hash: &Hash,
372    incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
373) -> Result<(), Error>
374where
375    W: Write,
376    R: Read,
377{
378    newer::Context::reserialize_bank_fields_with_hash(
379        stream_reader,
380        stream_writer,
381        accounts_hash,
382        incremental_snapshot_persistence,
383    )
384}
385
386/// effectively updates the accounts hash in the serialized bank file on disk
387/// read serialized bank from pre file
388/// update accounts_hash
389/// write serialized bank to post file
390/// return true if pre file found
391pub fn reserialize_bank_with_new_accounts_hash(
392    bank_snapshots_dir: impl AsRef<Path>,
393    slot: Slot,
394    accounts_hash: &Hash,
395    incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
396) -> bool {
397    let bank_post = snapshot_utils::get_bank_snapshots_dir(bank_snapshots_dir, slot);
398    let bank_post = bank_post.join(snapshot_utils::get_snapshot_file_name(slot));
399    let mut bank_pre = bank_post.clone();
400    bank_pre.set_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
401
402    let mut found = false;
403    {
404        let file = std::fs::File::open(&bank_pre);
405        // some tests don't create the file
406        if let Ok(file) = file {
407            found = true;
408            let file_out = std::fs::File::create(bank_post).unwrap();
409            reserialize_bank_fields_with_new_hash(
410                &mut BufReader::new(file),
411                &mut BufWriter::new(file_out),
412                accounts_hash,
413                incremental_snapshot_persistence,
414            )
415            .unwrap();
416        }
417    }
418    if found {
419        std::fs::remove_file(bank_pre).unwrap();
420    }
421    found
422}
423
424struct SerializableBankAndStorage<'a, C> {
425    bank: &'a Bank,
426    snapshot_storages: &'a [SnapshotStorage],
427    phantom: std::marker::PhantomData<C>,
428}
429
430impl<'a, C: TypeContext<'a>> Serialize for SerializableBankAndStorage<'a, C> {
431    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
432    where
433        S: serde::ser::Serializer,
434    {
435        C::serialize_bank_and_storage(serializer, self)
436    }
437}
438
439#[cfg(test)]
440struct SerializableBankAndStorageNoExtra<'a, C> {
441    bank: &'a Bank,
442    snapshot_storages: &'a [SnapshotStorage],
443    phantom: std::marker::PhantomData<C>,
444}
445
446#[cfg(test)]
447impl<'a, C: TypeContext<'a>> Serialize for SerializableBankAndStorageNoExtra<'a, C> {
448    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
449    where
450        S: serde::ser::Serializer,
451    {
452        C::serialize_bank_and_storage_without_extra_fields(serializer, self)
453    }
454}
455
456#[cfg(test)]
457impl<'a, C> From<SerializableBankAndStorageNoExtra<'a, C>> for SerializableBankAndStorage<'a, C> {
458    fn from(s: SerializableBankAndStorageNoExtra<'a, C>) -> SerializableBankAndStorage<'a, C> {
459        let SerializableBankAndStorageNoExtra {
460            bank,
461            snapshot_storages,
462            phantom,
463        } = s;
464        SerializableBankAndStorage {
465            bank,
466            snapshot_storages,
467            phantom,
468        }
469    }
470}
471
472struct SerializableAccountsDb<'a, C> {
473    accounts_db: &'a AccountsDb,
474    slot: Slot,
475    account_storage_entries: &'a [SnapshotStorage],
476    phantom: std::marker::PhantomData<C>,
477}
478
479impl<'a, C: TypeContext<'a>> Serialize for SerializableAccountsDb<'a, C> {
480    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
481    where
482        S: serde::ser::Serializer,
483    {
484        C::serialize_accounts_db_fields(serializer, self)
485    }
486}
487
488#[cfg(RUSTC_WITH_SPECIALIZATION)]
489impl<'a, C> safecoin_frozen_abi::abi_example::IgnoreAsHelper for SerializableAccountsDb<'a, C> {}
490
491#[allow(clippy::too_many_arguments)]
492fn reconstruct_bank_from_fields<E>(
493    bank_fields: BankFieldsToDeserialize,
494    snapshot_accounts_db_fields: SnapshotAccountsDbFields<E>,
495    genesis_config: &GenesisConfig,
496    account_paths: &[PathBuf],
497    unpacked_append_vec_map: UnpackedAppendVecMap,
498    debug_keys: Option<Arc<HashSet<Pubkey>>>,
499    additional_builtins: Option<&Builtins>,
500    account_secondary_indexes: AccountSecondaryIndexes,
501    caching_enabled: bool,
502    limit_load_slot_count_from_snapshot: Option<usize>,
503    shrink_ratio: AccountShrinkThreshold,
504    verify_index: bool,
505    accounts_db_config: Option<AccountsDbConfig>,
506    accounts_update_notifier: Option<AccountsUpdateNotifier>,
507) -> Result<Bank, Error>
508where
509    E: SerializableStorage + std::marker::Sync,
510{
511    let (accounts_db, reconstructed_accounts_db_info) = reconstruct_accountsdb_from_fields(
512        snapshot_accounts_db_fields,
513        account_paths,
514        unpacked_append_vec_map,
515        genesis_config,
516        account_secondary_indexes,
517        caching_enabled,
518        limit_load_slot_count_from_snapshot,
519        shrink_ratio,
520        verify_index,
521        accounts_db_config,
522        accounts_update_notifier,
523    )?;
524
525    let bank_rc = BankRc::new(Accounts::new_empty(accounts_db), bank_fields.slot);
526
527    // if limit_load_slot_count_from_snapshot is set, then we need to side-step some correctness checks beneath this call
528    let debug_do_not_add_builtins = limit_load_slot_count_from_snapshot.is_some();
529    let bank = Bank::new_from_fields(
530        bank_rc,
531        genesis_config,
532        bank_fields,
533        debug_keys,
534        additional_builtins,
535        debug_do_not_add_builtins,
536        reconstructed_accounts_db_info.accounts_data_len,
537    );
538
539    info!("rent_collector: {:?}", bank.rent_collector());
540
541    Ok(bank)
542}
543
544fn reconstruct_single_storage(
545    slot: &Slot,
546    append_vec_path: &Path,
547    current_len: usize,
548    append_vec_id: AppendVecId,
549) -> io::Result<Arc<AccountStorageEntry>> {
550    let (accounts, num_accounts) = AppendVec::new_from_file(append_vec_path, current_len)?;
551    Ok(Arc::new(AccountStorageEntry::new_existing(
552        *slot,
553        append_vec_id,
554        accounts,
555        num_accounts,
556    )))
557}
558
559fn remap_append_vec_file(
560    slot: Slot,
561    old_append_vec_id: SerializedAppendVecId,
562    append_vec_path: &Path,
563    next_append_vec_id: &AtomicAppendVecId,
564    num_collisions: &AtomicUsize,
565) -> io::Result<(AppendVecId, PathBuf)> {
566    // Remap the AppendVec ID to handle any duplicate IDs that may previously existed
567    // due to full snapshots and incremental snapshots generated from different nodes
568    let (remapped_append_vec_id, remapped_append_vec_path) = loop {
569        let remapped_append_vec_id = next_append_vec_id.fetch_add(1, Ordering::AcqRel);
570        let remapped_file_name = AppendVec::file_name(slot, remapped_append_vec_id);
571        let remapped_append_vec_path = append_vec_path.parent().unwrap().join(&remapped_file_name);
572
573        // Break out of the loop in the following situations:
574        // 1. The new ID is the same as the original ID.  This means we do not need to
575        //    rename the file, since the ID is the "correct" one already.
576        // 2. There is not a file already at the new path.  This means it is safe to
577        //    rename the file to this new path.
578        //    **DEVELOPER NOTE:**  Keep this check last so that it can short-circuit if
579        //    possible.
580        if old_append_vec_id == remapped_append_vec_id as SerializedAppendVecId
581            || std::fs::metadata(&remapped_append_vec_path).is_err()
582        {
583            break (remapped_append_vec_id, remapped_append_vec_path);
584        }
585
586        // If we made it this far, a file exists at the new path.  Record the collision
587        // and try again.
588        num_collisions.fetch_add(1, Ordering::Relaxed);
589    };
590    // Only rename the file if the new ID is actually different from the original.
591    if old_append_vec_id != remapped_append_vec_id as SerializedAppendVecId {
592        std::fs::rename(append_vec_path, &remapped_append_vec_path)?;
593    }
594
595    Ok((remapped_append_vec_id, remapped_append_vec_path))
596}
597
598fn remap_and_reconstruct_single_storage(
599    slot: Slot,
600    old_append_vec_id: SerializedAppendVecId,
601    current_len: usize,
602    append_vec_path: &Path,
603    next_append_vec_id: &AtomicAppendVecId,
604    num_collisions: &AtomicUsize,
605) -> io::Result<Arc<AccountStorageEntry>> {
606    let (remapped_append_vec_id, remapped_append_vec_path) = remap_append_vec_file(
607        slot,
608        old_append_vec_id,
609        append_vec_path,
610        next_append_vec_id,
611        num_collisions,
612    )?;
613    let storage = reconstruct_single_storage(
614        &slot,
615        &remapped_append_vec_path,
616        current_len,
617        remapped_append_vec_id,
618    )?;
619    Ok(storage)
620}
621
622fn remap_and_reconstruct_slot_storage<E>(
623    slot: Slot,
624    slot_storage: &[E],
625    unpacked_append_vec_map: &UnpackedAppendVecMap,
626    next_append_vec_id: &AtomicAppendVecId,
627    num_collisions: &AtomicUsize,
628) -> Result<HashMap<AppendVecId, Arc<AccountStorageEntry>>, Error>
629where
630    E: SerializableStorage,
631{
632    slot_storage
633        .iter()
634        .map(|storage_entry| {
635            let file_name = AppendVec::file_name(slot, storage_entry.id());
636            let append_vec_path = unpacked_append_vec_map.get(&file_name).ok_or_else(|| {
637                io::Error::new(
638                    io::ErrorKind::NotFound,
639                    format!("{} not found in unpacked append vecs", file_name),
640                )
641            })?;
642
643            let new_storage_entry = remap_and_reconstruct_single_storage(
644                slot,
645                storage_entry.id(),
646                storage_entry.current_len(),
647                append_vec_path,
648                next_append_vec_id,
649                num_collisions,
650            )?;
651            Ok((new_storage_entry.append_vec_id(), new_storage_entry))
652        })
653        .collect::<Result<HashMap<AppendVecId, _>, Error>>()
654}
655
656fn remap_and_reconstruct_storages<E>(
657    snapshot_storages: Vec<(Slot, Vec<E>)>,
658    unpacked_append_vec_map: &UnpackedAppendVecMap,
659    next_append_vec_id: &AtomicAppendVecId,
660    num_collisions: &AtomicUsize,
661) -> Result<HashMap<Slot, HashMap<AppendVecId, Arc<AccountStorageEntry>>>, Error>
662where
663    E: SerializableStorage + std::marker::Sync,
664{
665    snapshot_storages
666        .into_par_iter()
667        .map(|(slot, slot_storage)| {
668            Ok((
669                *slot,
670                remap_and_reconstruct_slot_storage(
671                    *slot,
672                    slot_storage,
673                    unpacked_append_vec_map,
674                    next_append_vec_id,
675                    num_collisions,
676                )?,
677            ))
678        })
679        .collect::<Result<HashMap<Slot, _>, Error>>()
680}
681
682/// This struct contains side-info while reconstructing the accounts DB from fields.
683#[derive(Debug, Default, Copy, Clone)]
684struct ReconstructedAccountsDbInfo {
685    accounts_data_len: u64,
686}
687
688#[allow(clippy::too_many_arguments)]
689fn reconstruct_accountsdb_from_fields<E>(
690    snapshot_accounts_db_fields: SnapshotAccountsDbFields<E>,
691    account_paths: &[PathBuf],
692    unpacked_append_vec_map: UnpackedAppendVecMap,
693    genesis_config: &GenesisConfig,
694    account_secondary_indexes: AccountSecondaryIndexes,
695    caching_enabled: bool,
696    limit_load_slot_count_from_snapshot: Option<usize>,
697    shrink_ratio: AccountShrinkThreshold,
698    verify_index: bool,
699    accounts_db_config: Option<AccountsDbConfig>,
700    accounts_update_notifier: Option<AccountsUpdateNotifier>,
701) -> Result<(AccountsDb, ReconstructedAccountsDbInfo), Error>
702where
703    E: SerializableStorage + std::marker::Sync,
704{
705    let mut accounts_db = AccountsDb::new_with_config(
706        account_paths.to_vec(),
707        &genesis_config.cluster_type,
708        account_secondary_indexes,
709        caching_enabled,
710        shrink_ratio,
711        accounts_db_config,
712        accounts_update_notifier,
713    );
714
715    let AccountsDbFields(
716        snapshot_storages,
717        snapshot_version,
718        snapshot_slot,
719        snapshot_bank_hash_info,
720        snapshot_historical_roots,
721        snapshot_historical_roots_with_hash,
722    ) = snapshot_accounts_db_fields.collapse_into()?;
723
724    let snapshot_storages = snapshot_storages.into_iter().collect::<Vec<_>>();
725
726    // Ensure all account paths exist
727    for path in &accounts_db.paths {
728        std::fs::create_dir_all(path)
729            .unwrap_or_else(|err| panic!("Failed to create directory {}: {}", path.display(), err));
730    }
731
732    reconstruct_historical_roots(
733        &accounts_db,
734        snapshot_historical_roots,
735        snapshot_historical_roots_with_hash,
736    );
737
738    // Remap the deserialized AppendVec paths to point to correct local paths
739    let num_collisions = AtomicUsize::new(0);
740    let next_append_vec_id = AtomicAppendVecId::new(0);
741    let (mut storage, measure_remap) = measure!(remap_and_reconstruct_storages(
742        snapshot_storages,
743        &unpacked_append_vec_map,
744        &next_append_vec_id,
745        &num_collisions
746    )?);
747
748    // discard any slots with no storage entries
749    // this can happen if a non-root slot was serialized
750    // but non-root stores should not be included in the snapshot
751    storage.retain(|_slot, stores| !stores.is_empty());
752    assert!(
753        !storage.is_empty(),
754        "At least one storage entry must exist from deserializing stream"
755    );
756
757    let next_append_vec_id = next_append_vec_id.load(Ordering::Acquire);
758    let max_append_vec_id = next_append_vec_id - 1;
759    assert!(
760        max_append_vec_id <= AppendVecId::MAX / 2,
761        "Storage id {} larger than allowed max",
762        max_append_vec_id
763    );
764
765    // Process deserialized data, set necessary fields in self
766    accounts_db
767        .bank_hashes
768        .write()
769        .unwrap()
770        .insert(snapshot_slot, snapshot_bank_hash_info);
771    accounts_db.storage.map.extend(
772        storage
773            .into_iter()
774            .map(|(slot, slot_storage_entry)| (slot, Arc::new(RwLock::new(slot_storage_entry)))),
775    );
776    accounts_db
777        .next_id
778        .store(next_append_vec_id, Ordering::Release);
779    accounts_db
780        .write_version
781        .fetch_add(snapshot_version, Ordering::Release);
782
783    let mut measure_notify = Measure::start("accounts_notify");
784
785    let accounts_db = Arc::new(accounts_db);
786    let accounts_db_clone = accounts_db.clone();
787    let handle = Builder::new()
788        .name("solNfyAccRestor".to_string())
789        .spawn(move || {
790            accounts_db_clone.notify_account_restore_from_snapshot();
791        })
792        .unwrap();
793
794    let IndexGenerationInfo {
795        accounts_data_len,
796        rent_paying_accounts_by_partition,
797    } = accounts_db.generate_index(
798        limit_load_slot_count_from_snapshot,
799        verify_index,
800        genesis_config,
801    );
802    accounts_db
803        .accounts_index
804        .rent_paying_accounts_by_partition
805        .set(rent_paying_accounts_by_partition)
806        .unwrap();
807
808    accounts_db.maybe_add_filler_accounts(
809        &genesis_config.epoch_schedule,
810        &genesis_config.rent,
811        snapshot_slot,
812    );
813
814    handle.join().unwrap();
815    measure_notify.stop();
816
817    datapoint_info!(
818        "reconstruct_accountsdb_from_fields()",
819        ("remap-time-us", measure_remap.as_us(), i64),
820        (
821            "remap-collisions",
822            num_collisions.load(Ordering::Relaxed),
823            i64
824        ),
825        ("accountsdb-notify-at-start-us", measure_notify.as_us(), i64),
826    );
827
828    Ok((
829        Arc::try_unwrap(accounts_db).unwrap(),
830        ReconstructedAccountsDbInfo { accounts_data_len },
831    ))
832}
833
834/// populate 'historical_roots' from 'snapshot_historical_roots' and 'snapshot_historical_roots_with_hash'
835fn reconstruct_historical_roots(
836    accounts_db: &AccountsDb,
837    mut snapshot_historical_roots: Vec<Slot>,
838    snapshot_historical_roots_with_hash: Vec<(Slot, Hash)>,
839) {
840    // inflate 'historical_roots'
841    // inserting into 'historical_roots' needs to be in order
842    // combine the slots into 1 vec, then sort
843    // dups are ok
844    snapshot_historical_roots.extend(
845        snapshot_historical_roots_with_hash
846            .into_iter()
847            .map(|(root, _)| root),
848    );
849    snapshot_historical_roots.sort_unstable();
850    let mut roots_tracker = accounts_db.accounts_index.roots_tracker.write().unwrap();
851    snapshot_historical_roots.into_iter().for_each(|root| {
852        roots_tracker.historical_roots.insert(root);
853    });
854}