solana_runtime/
snapshot_utils.rs

1use {
2    crate::{
3        bank::{BankFieldsToSerialize, BankSlotDelta},
4        serde_snapshot::{
5            self, BankIncrementalSnapshotPersistence, ExtraFieldsToSerialize, SnapshotStreams,
6        },
7        snapshot_archive_info::{
8            FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
9            SnapshotArchiveInfoGetter,
10        },
11        snapshot_bank_utils,
12        snapshot_config::SnapshotConfig,
13        snapshot_hash::SnapshotHash,
14        snapshot_package::{SnapshotKind, SnapshotPackage},
15        snapshot_utils::snapshot_storage_rebuilder::{
16            RebuiltSnapshotStorage, SnapshotStorageRebuilder,
17        },
18    },
19    bzip2::bufread::BzDecoder,
20    crossbeam_channel::Sender,
21    flate2::read::GzDecoder,
22    lazy_static::lazy_static,
23    log::*,
24    regex::Regex,
25    solana_accounts_db::{
26        account_storage::{meta::StoredMetaWriteVersion, AccountStorageMap},
27        accounts_db::{stats::BankHashStats, AccountStorageEntry, AtomicAccountsFileId},
28        accounts_file::{AccountsFile, AccountsFileError, InternalsForArchive, StorageAccess},
29        accounts_hash::{AccountsDeltaHash, AccountsHash},
30        epoch_accounts_hash::EpochAccountsHash,
31        hardened_unpack::{self, ParallelSelector, UnpackError},
32        shared_buffer_reader::{SharedBuffer, SharedBufferReader},
33        utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
34    },
35    solana_measure::{measure::Measure, measure_time, measure_us},
36    solana_sdk::{
37        clock::{Epoch, Slot},
38        hash::Hash,
39    },
40    std::{
41        cmp::Ordering,
42        collections::{HashMap, HashSet},
43        fmt, fs,
44        io::{BufReader, BufWriter, Error as IoError, Read, Result as IoResult, Seek, Write},
45        mem,
46        num::NonZeroUsize,
47        ops::RangeInclusive,
48        path::{Path, PathBuf},
49        process::ExitStatus,
50        str::FromStr,
51        sync::Arc,
52        thread::{Builder, JoinHandle},
53    },
54    tar::{self, Archive},
55    tempfile::TempDir,
56    thiserror::Error,
57};
58#[cfg(feature = "dev-context-only-utils")]
59use {
60    hardened_unpack::UnpackedAppendVecMap, rayon::prelude::*,
61    solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
62};
63
64mod archive_format;
65pub mod snapshot_storage_rebuilder;
66pub use archive_format::*;
67
68pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
69pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
70pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
71pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
72pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
73pub const SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME: &str = "full_snapshot_slot";
74pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
75const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; // byte
76const VERSION_STRING_V1_2_0: &str = "1.2.0";
77pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
78pub const BANK_SNAPSHOT_PRE_FILENAME_EXTENSION: &str = "pre";
79// The following unsafes are
80// - Safe because the values are fixed, known non-zero constants
81// - Necessary in order to have a plain NonZeroUsize as the constant, NonZeroUsize
82//   returns an Option<NonZeroUsize> and we can't .unwrap() at compile time
83pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
84    unsafe { NonZeroUsize::new_unchecked(2) };
85pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
86    unsafe { NonZeroUsize::new_unchecked(4) };
87pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
88pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
89
90#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
91pub enum SnapshotVersion {
92    #[default]
93    V1_2_0,
94}
95
96impl fmt::Display for SnapshotVersion {
97    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
98        f.write_str(From::from(*self))
99    }
100}
101
102impl From<SnapshotVersion> for &'static str {
103    fn from(snapshot_version: SnapshotVersion) -> &'static str {
104        match snapshot_version {
105            SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
106        }
107    }
108}
109
110impl FromStr for SnapshotVersion {
111    type Err = &'static str;
112
113    fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
114        // Remove leading 'v' or 'V' from slice
115        let version_string = if version_string
116            .get(..1)
117            .map_or(false, |s| s.eq_ignore_ascii_case("v"))
118        {
119            &version_string[1..]
120        } else {
121            version_string
122        };
123        match version_string {
124            VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
125            _ => Err("unsupported snapshot version"),
126        }
127    }
128}
129
130impl SnapshotVersion {
131    pub fn as_str(self) -> &'static str {
132        <&str as From<Self>>::from(self)
133    }
134}
135
136/// Information about a bank snapshot. Namely the slot of the bank, the path to the snapshot, and
137/// the kind of the snapshot.
138#[derive(PartialEq, Eq, Debug)]
139pub struct BankSnapshotInfo {
140    /// Slot of the bank
141    pub slot: Slot,
142    /// Snapshot kind
143    pub snapshot_kind: BankSnapshotKind,
144    /// Path to the bank snapshot directory
145    pub snapshot_dir: PathBuf,
146    /// Snapshot version
147    pub snapshot_version: SnapshotVersion,
148}
149
150impl PartialOrd for BankSnapshotInfo {
151    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
152        Some(self.cmp(other))
153    }
154}
155
156// Order BankSnapshotInfo by slot (ascending), which practically is sorting chronologically
157impl Ord for BankSnapshotInfo {
158    fn cmp(&self, other: &Self) -> Ordering {
159        self.slot.cmp(&other.slot)
160    }
161}
162
163impl BankSnapshotInfo {
164    pub fn new_from_dir(
165        bank_snapshots_dir: impl AsRef<Path>,
166        slot: Slot,
167    ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
168        // check this directory to see if there is a BankSnapshotPre and/or
169        // BankSnapshotPost file
170        let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
171
172        if !bank_snapshot_dir.is_dir() {
173            return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
174                bank_snapshot_dir,
175            ));
176        }
177
178        // Among the files checks, the completion flag file check should be done first to avoid the later
179        // I/O errors.
180
181        // There is a time window from the slot directory being created, and the content being completely
182        // filled.  Check the completion to avoid using a highest found slot directory with missing content.
183        if !is_bank_snapshot_complete(&bank_snapshot_dir) {
184            return Err(SnapshotNewFromDirError::IncompleteDir(bank_snapshot_dir));
185        }
186
187        let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
188        if !status_cache_file.is_file() {
189            return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
190                status_cache_file,
191            ));
192        }
193
194        let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
195        let version_str = snapshot_version_from_file(&version_path).or(Err(
196            SnapshotNewFromDirError::MissingVersionFile(version_path),
197        ))?;
198        let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
199            .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
200
201        let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
202        let bank_snapshot_pre_path =
203            bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
204
205        // NOTE: It is important that checking for "Pre" happens before "Post.
206        //
207        // Consider the scenario where AccountsHashVerifier is actively processing an
208        // AccountsPackage for a snapshot/slot; if AHV is in the middle of reserializing the
209        // bank snapshot file (writing the new "Post" file), and then the process dies,
210        // there will be an incomplete "Post" file on disk.  We do not want only the existence of
211        // this "Post" file to be sufficient for deciding the snapshot kind as "Post".  More so,
212        // "Post" *requires* the *absence* of a "Pre" file.
213        let snapshot_kind = if bank_snapshot_pre_path.is_file() {
214            BankSnapshotKind::Pre
215        } else if bank_snapshot_post_path.is_file() {
216            BankSnapshotKind::Post
217        } else {
218            return Err(SnapshotNewFromDirError::MissingSnapshotFile(
219                bank_snapshot_dir,
220            ));
221        };
222
223        Ok(BankSnapshotInfo {
224            slot,
225            snapshot_kind,
226            snapshot_dir: bank_snapshot_dir,
227            snapshot_version,
228        })
229    }
230
231    pub fn snapshot_path(&self) -> PathBuf {
232        let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
233
234        let ext = match self.snapshot_kind {
235            BankSnapshotKind::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
236            BankSnapshotKind::Post => "",
237        };
238        bank_snapshot_path.set_extension(ext);
239
240        bank_snapshot_path
241    }
242}
243
244/// Bank snapshots traditionally had their accounts hash calculated prior to serialization.  Since
245/// the hash calculation takes a long time, an optimization has been put in to offload the accounts
246/// hash calculation.  The bank serialization format has not changed, so we need another way to
247/// identify if a bank snapshot contains the calculated accounts hash or not.
248///
249/// When a bank snapshot is first taken, it does not have the calculated accounts hash.  It is said
250/// that this bank snapshot is "pre" accounts hash.  Later, when the accounts hash is calculated,
251/// the bank snapshot is re-serialized, and is now "post" accounts hash.
252#[derive(Debug, Copy, Clone, Eq, PartialEq)]
253pub enum BankSnapshotKind {
254    /// This bank snapshot has *not* yet had its accounts hash calculated
255    Pre,
256    /// This bank snapshot *has* had its accounts hash calculated
257    Post,
258}
259
260/// When constructing a bank a snapshot, traditionally the snapshot was from a snapshot archive.  Now,
261/// the snapshot can be from a snapshot directory, or from a snapshot archive.  This is the flag to
262/// indicate which.
263#[derive(Clone, Copy, Debug, Eq, PartialEq)]
264pub enum SnapshotFrom {
265    /// Build from the snapshot archive
266    Archive,
267    /// Build directly from the bank snapshot directory
268    Dir,
269}
270
271/// Helper type when rebuilding from snapshots.  Designed to handle when rebuilding from just a
272/// full snapshot, or from both a full snapshot and an incremental snapshot.
273#[derive(Debug)]
274pub struct SnapshotRootPaths {
275    pub full_snapshot_root_file_path: PathBuf,
276    pub incremental_snapshot_root_file_path: Option<PathBuf>,
277}
278
279/// Helper type to bundle up the results from `unarchive_snapshot()`
280#[derive(Debug)]
281pub struct UnarchivedSnapshot {
282    #[allow(dead_code)]
283    unpack_dir: TempDir,
284    pub storage: AccountStorageMap,
285    pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
286    pub measure_untar: Measure,
287}
288
289/// Helper type for passing around the unpacked snapshots dir and the snapshot version together
290#[derive(Debug)]
291pub struct UnpackedSnapshotsDirAndVersion {
292    pub unpacked_snapshots_dir: PathBuf,
293    pub snapshot_version: SnapshotVersion,
294}
295
296/// Helper type for passing around account storage map and next append vec id
297/// for reconstructing accounts from a snapshot
298pub(crate) struct StorageAndNextAccountsFileId {
299    pub storage: AccountStorageMap,
300    pub next_append_vec_id: AtomicAccountsFileId,
301}
302
303#[derive(Error, Debug)]
304#[allow(clippy::large_enum_variant)]
305pub enum SnapshotError {
306    #[error("I/O error: {0}")]
307    Io(#[from] IoError),
308
309    #[error("AccountsFile error: {0}")]
310    AccountsFileError(#[from] AccountsFileError),
311
312    #[error("serialization error: {0}")]
313    Serialize(#[from] bincode::Error),
314
315    #[error("crossbeam send error: {0}")]
316    CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
317
318    #[error("archive generation failure {0}")]
319    ArchiveGenerationFailure(ExitStatus),
320
321    #[error("Unpack error: {0}")]
322    UnpackError(#[from] UnpackError),
323
324    #[error("source({1}) - I/O error: {0}")]
325    IoWithSource(IoError, &'static str),
326
327    #[error("could not get file name from path '{0}'")]
328    PathToFileNameError(PathBuf),
329
330    #[error("could not get str from file name '{0}'")]
331    FileNameToStrError(PathBuf),
332
333    #[error("could not parse snapshot archive's file name '{0}'")]
334    ParseSnapshotArchiveFileNameError(String),
335
336    #[error("snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot ({1}) do not match")]
337    MismatchedBaseSlot(Slot, Slot),
338
339    #[error("no snapshot archives to load from '{0}'")]
340    NoSnapshotArchives(PathBuf),
341
342    #[error("snapshot has mismatch: deserialized bank: {0:?}, snapshot archive info: {1:?}")]
343    MismatchedSlotHash((Slot, SnapshotHash), (Slot, SnapshotHash)),
344
345    #[error("snapshot slot deltas are invalid: {0}")]
346    VerifySlotDeltas(#[from] VerifySlotDeltasError),
347
348    #[error("snapshot epoch stakes are invalid: {0}")]
349    VerifyEpochStakes(#[from] VerifyEpochStakesError),
350
351    #[error("bank_snapshot_info new_from_dir failed: {0}")]
352    NewFromDir(#[from] SnapshotNewFromDirError),
353
354    #[error("invalid snapshot dir path '{0}'")]
355    InvalidSnapshotDirPath(PathBuf),
356
357    #[error("invalid AppendVec path '{0}'")]
358    InvalidAppendVecPath(PathBuf),
359
360    #[error("invalid account path '{0}'")]
361    InvalidAccountPath(PathBuf),
362
363    #[error("no valid snapshot dir found under '{0}'")]
364    NoSnapshotSlotDir(PathBuf),
365
366    #[error("snapshot dir account paths mismatching")]
367    AccountPathsMismatch,
368
369    #[error("failed to add bank snapshot for slot {1}: {0}")]
370    AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
371
372    #[error("failed to archive snapshot package: {0}")]
373    ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
374
375    #[error("failed to rebuild snapshot storages: {0}")]
376    RebuildStorages(String),
377}
378
379#[derive(Error, Debug)]
380pub enum SnapshotNewFromDirError {
381    #[error("invalid bank snapshot directory '{0}'")]
382    InvalidBankSnapshotDir(PathBuf),
383
384    #[error("missing status cache file '{0}'")]
385    MissingStatusCacheFile(PathBuf),
386
387    #[error("missing version file '{0}'")]
388    MissingVersionFile(PathBuf),
389
390    #[error("invalid snapshot version '{0}'")]
391    InvalidVersion(String),
392
393    #[error("snapshot directory incomplete '{0}'")]
394    IncompleteDir(PathBuf),
395
396    #[error("missing snapshot file '{0}'")]
397    MissingSnapshotFile(PathBuf),
398}
399
400pub type Result<T> = std::result::Result<T, SnapshotError>;
401
402/// Errors that can happen in `verify_slot_deltas()`
403#[derive(Error, Debug, PartialEq, Eq)]
404pub enum VerifySlotDeltasError {
405    #[error("too many entries: {0} (max: {1})")]
406    TooManyEntries(usize, usize),
407
408    #[error("slot {0} is not a root")]
409    SlotIsNotRoot(Slot),
410
411    #[error("slot {0} is greater than bank slot {1}")]
412    SlotGreaterThanMaxRoot(Slot, Slot),
413
414    #[error("slot {0} has multiple entries")]
415    SlotHasMultipleEntries(Slot),
416
417    #[error("slot {0} was not found in slot history")]
418    SlotNotFoundInHistory(Slot),
419
420    #[error("slot {0} was in history but missing from slot deltas")]
421    SlotNotFoundInDeltas(Slot),
422
423    #[error("slot history is bad and cannot be used to verify slot deltas")]
424    BadSlotHistory,
425}
426
427/// Errors that can happen in `verify_epoch_stakes()`
428#[derive(Error, Debug, PartialEq, Eq)]
429pub enum VerifyEpochStakesError {
430    #[error("epoch {0} is greater than the max {1}")]
431    EpochGreaterThanMax(Epoch, Epoch),
432
433    #[error("stakes not found for epoch {0} (required epochs: {1:?})")]
434    StakesNotFound(Epoch, RangeInclusive<Epoch>),
435}
436
437/// Errors that can happen in `add_bank_snapshot()`
438#[derive(Error, Debug)]
439pub enum AddBankSnapshotError {
440    #[error("bank snapshot dir already exists '{0}'")]
441    SnapshotDirAlreadyExists(PathBuf),
442
443    #[error("failed to create snapshot dir '{1}': {0}")]
444    CreateSnapshotDir(#[source] IoError, PathBuf),
445
446    #[error("failed to flush storage '{1}': {0}")]
447    FlushStorage(#[source] AccountsFileError, PathBuf),
448
449    #[error("failed to hard link storages: {0}")]
450    HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
451
452    #[error("failed to serialize bank: {0}")]
453    SerializeBank(#[source] Box<SnapshotError>),
454
455    #[error("failed to serialize status cache: {0}")]
456    SerializeStatusCache(#[source] Box<SnapshotError>),
457
458    #[error("failed to write snapshot version file '{1}': {0}")]
459    WriteSnapshotVersionFile(#[source] IoError, PathBuf),
460
461    #[error("failed to mark snapshot as 'complete': failed to create file '{1}': {0}")]
462    CreateStateCompleteFile(#[source] IoError, PathBuf),
463}
464
465/// Errors that can happen in `archive_snapshot_package()`
466#[derive(Error, Debug)]
467pub enum ArchiveSnapshotPackageError {
468    #[error("failed to create archive path '{1}': {0}")]
469    CreateArchiveDir(#[source] IoError, PathBuf),
470
471    #[error("failed to create staging dir inside '{1}': {0}")]
472    CreateStagingDir(#[source] IoError, PathBuf),
473
474    #[error("failed to create snapshot staging dir '{1}': {0}")]
475    CreateSnapshotStagingDir(#[source] IoError, PathBuf),
476
477    #[error("failed to canonicalize snapshot source dir '{1}': {0}")]
478    CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
479
480    #[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
481    SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
482
483    #[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
484    SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
485
486    #[error("failed to symlink version file from '{1}' to '{2}': {0}")]
487    SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
488
489    #[error("failed to create archive file '{1}': {0}")]
490    CreateArchiveFile(#[source] IoError, PathBuf),
491
492    #[error("failed to archive version file: {0}")]
493    ArchiveVersionFile(#[source] IoError),
494
495    #[error("failed to archive snapshots dir: {0}")]
496    ArchiveSnapshotsDir(#[source] IoError),
497
498    #[error("failed to archive account storage file '{1}': {0}")]
499    ArchiveAccountStorageFile(#[source] IoError, PathBuf),
500
501    #[error("failed to archive snapshot: {0}")]
502    FinishArchive(#[source] IoError),
503
504    #[error("failed to create encoder: {0}")]
505    CreateEncoder(#[source] IoError),
506
507    #[error("failed to encode archive: {0}")]
508    FinishEncoder(#[source] IoError),
509
510    #[error("failed to query archive metadata '{1}': {0}")]
511    QueryArchiveMetadata(#[source] IoError, PathBuf),
512
513    #[error("failed to move archive from '{1}' to '{2}': {0}")]
514    MoveArchive(#[source] IoError, PathBuf, PathBuf),
515}
516
517/// Errors that can happen in `hard_link_storages_to_snapshot()`
518#[derive(Error, Debug)]
519pub enum HardLinkStoragesToSnapshotError {
520    #[error("failed to create accounts hard links dir '{1}': {0}")]
521    CreateAccountsHardLinksDir(#[source] IoError, PathBuf),
522
523    #[error("failed to get the snapshot's accounts hard link dir: {0}")]
524    GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
525
526    #[error("failed to hard link storage from '{1}' to '{2}': {0}")]
527    HardLinkStorage(#[source] IoError, PathBuf, PathBuf),
528}
529
530/// Errors that can happen in `get_snapshot_accounts_hardlink_dir()`
531#[derive(Error, Debug)]
532pub enum GetSnapshotAccountsHardLinkDirError {
533    #[error("invalid account storage path '{0}'")]
534    GetAccountPath(PathBuf),
535
536    #[error("failed to create the snapshot hard link dir '{1}': {0}")]
537    CreateSnapshotHardLinkDir(#[source] IoError, PathBuf),
538
539    #[error("failed to symlink snapshot hard link dir '{link}' to '{original}': {source}")]
540    SymlinkSnapshotHardLinkDir {
541        source: IoError,
542        original: PathBuf,
543        link: PathBuf,
544    },
545}
546
547/// The account snapshot directories under <account_path>/snapshot/<slot> contain account files hardlinked
548/// from <account_path>/run taken at snapshot <slot> time.  They are referenced by the symlinks from the
549/// bank snapshot dir snapshot/<slot>/accounts_hardlinks/.  We observed that sometimes the bank snapshot dir
550/// could be deleted but the account snapshot directories were left behind, possibly by some manual operations
551/// or some legacy code not using the symlinks to clean up the account snapshot hardlink directories.
552/// This function cleans up any account snapshot directories that are no longer referenced by the bank
553/// snapshot dirs, to ensure proper snapshot operations.
554pub fn clean_orphaned_account_snapshot_dirs(
555    bank_snapshots_dir: impl AsRef<Path>,
556    account_snapshot_paths: &[PathBuf],
557) -> IoResult<()> {
558    // Create the HashSet of the account snapshot hardlink directories referenced by the snapshot dirs.
559    // This is used to clean up any hardlinks that are no longer referenced by the snapshot dirs.
560    let mut account_snapshot_dirs_referenced = HashSet::new();
561    let snapshots = get_bank_snapshots(bank_snapshots_dir);
562    for snapshot in snapshots {
563        let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
564        // loop through entries in the snapshot_hardlink_dir, read the symlinks, add the target to the HashSet
565        let read_dir = fs::read_dir(&account_hardlinks_dir).map_err(|err| {
566            IoError::other(format!(
567                "failed to read account hardlinks dir '{}': {err}",
568                account_hardlinks_dir.display(),
569            ))
570        })?;
571        for entry in read_dir {
572            let path = entry?.path();
573            let target = fs::read_link(&path).map_err(|err| {
574                IoError::other(format!(
575                    "failed to read symlink '{}': {err}",
576                    path.display(),
577                ))
578            })?;
579            account_snapshot_dirs_referenced.insert(target);
580        }
581    }
582
583    // loop through the account snapshot hardlink directories, if the directory is not in the account_snapshot_dirs_referenced set, delete it
584    for account_snapshot_path in account_snapshot_paths {
585        let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
586            IoError::other(format!(
587                "failed to read account snapshot dir '{}': {err}",
588                account_snapshot_path.display(),
589            ))
590        })?;
591        for entry in read_dir {
592            let path = entry?.path();
593            if !account_snapshot_dirs_referenced.contains(&path) {
594                info!(
595                    "Removing orphaned account snapshot hardlink directory '{}'...",
596                    path.display()
597                );
598                move_and_async_delete_path(&path);
599            }
600        }
601    }
602
603    Ok(())
604}
605
606/// Purges incomplete bank snapshots
607pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
608    let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
609        // If we cannot read the bank snapshots dir, then there's nothing to do
610        return;
611    };
612
613    let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
614
615    let incomplete_dirs: Vec<_> = read_dir_iter
616        .filter_map(|entry| entry.ok())
617        .map(|entry| entry.path())
618        .filter(|path| path.is_dir())
619        .filter(is_incomplete)
620        .collect();
621
622    // attempt to purge all the incomplete directories; do not exit early
623    for incomplete_dir in incomplete_dirs {
624        let result = purge_bank_snapshot(&incomplete_dir);
625        match result {
626            Ok(_) => info!(
627                "Purged incomplete snapshot dir: {}",
628                incomplete_dir.display()
629            ),
630            Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
631        }
632    }
633}
634
635/// Is the bank snapshot complete?
636fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
637    let state_complete_path = bank_snapshot_dir
638        .as_ref()
639        .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
640    state_complete_path.is_file()
641}
642
643/// Writes the full snapshot slot file into the bank snapshot dir
644pub fn write_full_snapshot_slot_file(
645    bank_snapshot_dir: impl AsRef<Path>,
646    full_snapshot_slot: Slot,
647) -> IoResult<()> {
648    let full_snapshot_slot_path = bank_snapshot_dir
649        .as_ref()
650        .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
651    fs::write(
652        &full_snapshot_slot_path,
653        Slot::to_le_bytes(full_snapshot_slot),
654    )
655    .map_err(|err| {
656        IoError::other(format!(
657            "failed to write full snapshot slot file '{}': {err}",
658            full_snapshot_slot_path.display(),
659        ))
660    })
661}
662
663// Reads the full snapshot slot file from the bank snapshot dir
664pub fn read_full_snapshot_slot_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<Slot> {
665    const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
666    let full_snapshot_slot_path = bank_snapshot_dir
667        .as_ref()
668        .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
669    let full_snapshot_slot_file_metadata = fs::metadata(&full_snapshot_slot_path)?;
670    if full_snapshot_slot_file_metadata.len() != SLOT_SIZE as u64 {
671        let error_message = format!(
672            "invalid full snapshot slot file size: '{}' has {} bytes (should be {} bytes)",
673            full_snapshot_slot_path.display(),
674            full_snapshot_slot_file_metadata.len(),
675            SLOT_SIZE,
676        );
677        return Err(IoError::other(error_message));
678    }
679    let mut full_snapshot_slot_file = fs::File::open(&full_snapshot_slot_path)?;
680    let mut buffer = [0; SLOT_SIZE];
681    full_snapshot_slot_file.read_exact(&mut buffer)?;
682    let slot = Slot::from_le_bytes(buffer);
683    Ok(slot)
684}
685
686/// Gets the highest, loadable, bank snapshot
687///
688/// The highest bank snapshot is the one with the highest slot.
689/// To be loadable, the bank snapshot must be a BankSnapshotKind::Post.
690/// And if we're generating snapshots (e.g. running a normal validator), then
691/// the full snapshot file's slot must match the highest full snapshot archive's.
692pub fn get_highest_loadable_bank_snapshot(
693    snapshot_config: &SnapshotConfig,
694) -> Option<BankSnapshotInfo> {
695    let highest_bank_snapshot =
696        get_highest_bank_snapshot_post(&snapshot_config.bank_snapshots_dir)?;
697
698    // If we're *not* generating snapshots, e.g. running ledger-tool, then we *can* load
699    // this bank snapshot, and we do not need to check for anything else.
700    if !snapshot_config.should_generate_snapshots() {
701        return Some(highest_bank_snapshot);
702    }
703
704    // Otherwise, the bank snapshot's full snapshot slot *must* be the same as
705    // the highest full snapshot archive's slot.
706    let highest_full_snapshot_archive_slot =
707        get_highest_full_snapshot_archive_slot(&snapshot_config.full_snapshot_archives_dir)?;
708    let full_snapshot_file_slot =
709        read_full_snapshot_slot_file(&highest_bank_snapshot.snapshot_dir).ok()?;
710    (full_snapshot_file_slot == highest_full_snapshot_archive_slot).then_some(highest_bank_snapshot)
711}
712
713/// If the validator halts in the middle of `archive_snapshot_package()`, the temporary staging
714/// directory won't be cleaned up.  Call this function to clean them up.
715pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
716    if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
717        for entry in entries.flatten() {
718            if entry
719                .file_name()
720                .to_str()
721                .map(|file_name| file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX))
722                .unwrap_or(false)
723            {
724                let path = entry.path();
725                let result = if path.is_dir() {
726                    fs::remove_dir_all(&path)
727                } else {
728                    fs::remove_file(&path)
729                };
730                if let Err(err) = result {
731                    warn!(
732                        "Failed to remove temporary snapshot archive '{}': {err}",
733                        path.display(),
734                    );
735                }
736            }
737        }
738    }
739}
740
741/// Serializes and archives a snapshot package
742pub fn serialize_and_archive_snapshot_package(
743    snapshot_package: SnapshotPackage,
744    snapshot_config: &SnapshotConfig,
745) -> Result<SnapshotArchiveInfo> {
746    let SnapshotPackage {
747        snapshot_kind,
748        slot: snapshot_slot,
749        block_height,
750        hash: snapshot_hash,
751        mut snapshot_storages,
752        status_cache_slot_deltas,
753        bank_fields_to_serialize,
754        bank_hash_stats,
755        accounts_delta_hash,
756        accounts_hash,
757        epoch_accounts_hash,
758        bank_incremental_snapshot_persistence,
759        write_version,
760        enqueued: _,
761    } = snapshot_package;
762
763    let bank_snapshot_info = serialize_snapshot(
764        &snapshot_config.bank_snapshots_dir,
765        snapshot_config.snapshot_version,
766        snapshot_storages.as_slice(),
767        status_cache_slot_deltas.as_slice(),
768        bank_fields_to_serialize,
769        bank_hash_stats,
770        accounts_delta_hash,
771        accounts_hash,
772        epoch_accounts_hash,
773        bank_incremental_snapshot_persistence.as_ref(),
774        write_version,
775    )?;
776
777    // now write the full snapshot slot file after serializing so this bank snapshot is loadable
778    let full_snapshot_archive_slot = match snapshot_kind {
779        SnapshotKind::FullSnapshot => snapshot_slot,
780        SnapshotKind::IncrementalSnapshot(base_slot) => base_slot,
781    };
782    write_full_snapshot_slot_file(&bank_snapshot_info.snapshot_dir, full_snapshot_archive_slot)
783        .map_err(|err| {
784            IoError::other(format!(
785                "failed to serialize snapshot slot {snapshot_slot}, block height {block_height}, kind {snapshot_kind:?}: {err}",
786            ))
787        })?;
788
789    let snapshot_archive_path = match snapshot_package.snapshot_kind {
790        SnapshotKind::FullSnapshot => build_full_snapshot_archive_path(
791            &snapshot_config.full_snapshot_archives_dir,
792            snapshot_package.slot,
793            &snapshot_package.hash,
794            snapshot_config.archive_format,
795        ),
796        SnapshotKind::IncrementalSnapshot(incremental_snapshot_base_slot) => {
797            // After the snapshot has been serialized, it is now safe (and required) to prune all
798            // the storages that are *not* to be archived for this incremental snapshot.
799            snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
800            build_incremental_snapshot_archive_path(
801                &snapshot_config.incremental_snapshot_archives_dir,
802                incremental_snapshot_base_slot,
803                snapshot_package.slot,
804                &snapshot_package.hash,
805                snapshot_config.archive_format,
806            )
807        }
808    };
809
810    let snapshot_archive_info = archive_snapshot(
811        snapshot_kind,
812        snapshot_slot,
813        snapshot_hash,
814        snapshot_storages.as_slice(),
815        &bank_snapshot_info.snapshot_dir,
816        snapshot_archive_path,
817        snapshot_config.archive_format,
818    )?;
819
820    Ok(snapshot_archive_info)
821}
822
823/// Serializes a snapshot into `bank_snapshots_dir`
824#[allow(clippy::too_many_arguments)]
825fn serialize_snapshot(
826    bank_snapshots_dir: impl AsRef<Path>,
827    snapshot_version: SnapshotVersion,
828    snapshot_storages: &[Arc<AccountStorageEntry>],
829    slot_deltas: &[BankSlotDelta],
830    mut bank_fields: BankFieldsToSerialize,
831    bank_hash_stats: BankHashStats,
832    accounts_delta_hash: AccountsDeltaHash,
833    accounts_hash: AccountsHash,
834    epoch_accounts_hash: Option<EpochAccountsHash>,
835    bank_incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
836    write_version: StoredMetaWriteVersion,
837) -> Result<BankSnapshotInfo> {
838    let slot = bank_fields.slot;
839
840    // this lambda function is to facilitate converting between
841    // the AddBankSnapshotError and SnapshotError types
842    let do_serialize_snapshot = || {
843        let mut measure_everything = Measure::start("");
844        let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
845        if bank_snapshot_dir.exists() {
846            return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
847                bank_snapshot_dir,
848            ));
849        }
850        fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
851            AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
852        })?;
853
854        // the bank snapshot is stored as bank_snapshots_dir/slot/slot
855        let bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
856        info!(
857            "Creating bank snapshot for slot {slot} at '{}'",
858            bank_snapshot_path.display(),
859        );
860
861        let (_, flush_storages_us) = measure_us!({
862            for storage in snapshot_storages {
863                storage.flush().map_err(|err| {
864                    AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
865                })?;
866            }
867        });
868
869        // We are constructing the snapshot directory to contain the full snapshot state information to allow
870        // constructing a bank from this directory.  It acts like an archive to include the full state.
871        // The set of the account storages files is the necessary part of this snapshot state.  Hard-link them
872        // from the operational accounts/ directory to here.
873        let (_, hard_link_storages_us) = measure_us!(hard_link_storages_to_snapshot(
874            &bank_snapshot_dir,
875            slot,
876            snapshot_storages
877        )
878        .map_err(AddBankSnapshotError::HardLinkStorages)?);
879
880        let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
881            let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
882            let extra_fields = ExtraFieldsToSerialize {
883                lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
884                incremental_snapshot_persistence: bank_incremental_snapshot_persistence,
885                epoch_accounts_hash,
886                versioned_epoch_stakes,
887            };
888            serde_snapshot::serialize_bank_snapshot_into(
889                stream,
890                bank_fields,
891                bank_hash_stats,
892                accounts_delta_hash,
893                accounts_hash,
894                &get_storages_to_serialize(snapshot_storages),
895                extra_fields,
896                write_version,
897            )?;
898            Ok(())
899        };
900        let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
901            serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
902                .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
903            "bank serialize"
904        );
905
906        let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
907        let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
908            snapshot_bank_utils::serialize_status_cache(slot_deltas, &status_cache_path)
909                .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
910        );
911
912        let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
913        let (_, write_version_file_us) = measure_us!(fs::write(
914            &version_path,
915            snapshot_version.as_str().as_bytes(),
916        )
917        .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
918
919        // Mark this directory complete so it can be used.  Check this flag first before selecting for deserialization.
920        let state_complete_path = bank_snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
921        let (_, write_state_complete_file_us) = measure_us!(fs::File::create(&state_complete_path)
922            .map_err(|err| {
923                AddBankSnapshotError::CreateStateCompleteFile(err, state_complete_path)
924            })?);
925
926        measure_everything.stop();
927
928        // Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
929        datapoint_info!(
930            "snapshot_bank",
931            ("slot", slot, i64),
932            ("bank_size", bank_snapshot_consumed_size, i64),
933            ("status_cache_size", status_cache_consumed_size, i64),
934            ("flush_storages_us", flush_storages_us, i64),
935            ("hard_link_storages_us", hard_link_storages_us, i64),
936            ("bank_serialize_us", bank_serialize.as_us(), i64),
937            ("status_cache_serialize_us", status_cache_serialize_us, i64),
938            ("write_version_file_us", write_version_file_us, i64),
939            (
940                "write_state_complete_file_us",
941                write_state_complete_file_us,
942                i64
943            ),
944            ("total_us", measure_everything.as_us(), i64),
945        );
946
947        info!(
948            "{} for slot {} at {}",
949            bank_serialize,
950            slot,
951            bank_snapshot_path.display(),
952        );
953
954        Ok(BankSnapshotInfo {
955            slot,
956            snapshot_kind: BankSnapshotKind::Pre,
957            snapshot_dir: bank_snapshot_dir,
958            snapshot_version,
959        })
960    };
961
962    do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
963}
964
965/// Archives a snapshot into `archive_path`
966fn archive_snapshot(
967    snapshot_kind: SnapshotKind,
968    snapshot_slot: Slot,
969    snapshot_hash: SnapshotHash,
970    snapshot_storages: &[Arc<AccountStorageEntry>],
971    bank_snapshot_dir: impl AsRef<Path>,
972    archive_path: impl AsRef<Path>,
973    archive_format: ArchiveFormat,
974) -> Result<SnapshotArchiveInfo> {
975    use ArchiveSnapshotPackageError as E;
976    const SNAPSHOTS_DIR: &str = "snapshots";
977    const ACCOUNTS_DIR: &str = "accounts";
978    info!("Generating snapshot archive for slot {snapshot_slot}, kind: {snapshot_kind:?}");
979
980    let mut timer = Measure::start("snapshot_package-package_snapshots");
981    let tar_dir = archive_path
982        .as_ref()
983        .parent()
984        .expect("Tar output path is invalid");
985
986    fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
987
988    // Create the staging directories
989    let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
990    let staging_dir = tempfile::Builder::new()
991        .prefix(&format!("{}{}-", staging_dir_prefix, snapshot_slot))
992        .tempdir_in(tar_dir)
993        .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
994    let staging_snapshots_dir = staging_dir.path().join(SNAPSHOTS_DIR);
995
996    let slot_str = snapshot_slot.to_string();
997    let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
998    // Creates staging snapshots/<slot>/
999    fs::create_dir_all(&staging_snapshot_dir)
1000        .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
1001
1002    // To be a source for symlinking and archiving, the path need to be an absolute path
1003    let src_snapshot_dir = bank_snapshot_dir.as_ref().canonicalize().map_err(|err| {
1004        E::CanonicalizeSnapshotSourceDir(err, bank_snapshot_dir.as_ref().to_path_buf())
1005    })?;
1006    let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
1007    let src_snapshot_file = src_snapshot_dir.join(slot_str);
1008    symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
1009        .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
1010
1011    // Following the existing archive format, the status cache is under snapshots/, not under <slot>/
1012    // like in the snapshot dir.
1013    let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1014    let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1015    symlink::symlink_file(&src_status_cache, &staging_status_cache)
1016        .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
1017
1018    // The bank snapshot has the version file, so symlink it to the correct staging path
1019    let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
1020    let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1021    symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
1022        E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
1023    })?;
1024
1025    // Tar the staging directory into the archive at `staging_archive_path`
1026    let staging_archive_path = tar_dir.join(format!(
1027        "{}{}.{}",
1028        staging_dir_prefix,
1029        snapshot_slot,
1030        archive_format.extension(),
1031    ));
1032
1033    {
1034        let mut archive_file = fs::File::create(&staging_archive_path)
1035            .map_err(|err| E::CreateArchiveFile(err, staging_archive_path.clone()))?;
1036
1037        let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
1038            let mut archive = tar::Builder::new(encoder);
1039            // Serialize the version and snapshots files before accounts so we can quickly determine the version
1040            // and other bank fields. This is necessary if we want to interleave unpacking with reconstruction
1041            archive
1042                .append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
1043                .map_err(E::ArchiveVersionFile)?;
1044            archive
1045                .append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
1046                .map_err(E::ArchiveSnapshotsDir)?;
1047
1048            for storage in snapshot_storages {
1049                let path_in_archive = Path::new(ACCOUNTS_DIR)
1050                    .join(AccountsFile::file_name(storage.slot(), storage.id()));
1051                match storage.accounts.internals_for_archive() {
1052                    InternalsForArchive::Mmap(data) => {
1053                        let mut header = tar::Header::new_gnu();
1054                        header.set_path(path_in_archive).map_err(|err| {
1055                            E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1056                        })?;
1057                        header.set_size(storage.capacity());
1058                        header.set_cksum();
1059                        archive.append(&header, data)
1060                    }
1061                    InternalsForArchive::FileIo(path) => {
1062                        archive.append_path_with_name(path, path_in_archive)
1063                    }
1064                }
1065                .map_err(|err| E::ArchiveAccountStorageFile(err, storage.path().to_path_buf()))?;
1066            }
1067
1068            archive.into_inner().map_err(E::FinishArchive)?;
1069            Ok(())
1070        };
1071
1072        match archive_format {
1073            ArchiveFormat::TarBzip2 => {
1074                let mut encoder =
1075                    bzip2::write::BzEncoder::new(archive_file, bzip2::Compression::best());
1076                do_archive_files(&mut encoder)?;
1077                encoder.finish().map_err(E::FinishEncoder)?;
1078            }
1079            ArchiveFormat::TarGzip => {
1080                let mut encoder =
1081                    flate2::write::GzEncoder::new(archive_file, flate2::Compression::default());
1082                do_archive_files(&mut encoder)?;
1083                encoder.finish().map_err(E::FinishEncoder)?;
1084            }
1085            ArchiveFormat::TarZstd => {
1086                // Compression level of 1 is optimized for speed.
1087                let mut encoder =
1088                    zstd::stream::Encoder::new(archive_file, 1).map_err(E::CreateEncoder)?;
1089                do_archive_files(&mut encoder)?;
1090                encoder.finish().map_err(E::FinishEncoder)?;
1091            }
1092            ArchiveFormat::TarLz4 => {
1093                let mut encoder = lz4::EncoderBuilder::new()
1094                    .level(1)
1095                    .build(archive_file)
1096                    .map_err(E::CreateEncoder)?;
1097                do_archive_files(&mut encoder)?;
1098                let (_output, result) = encoder.finish();
1099                result.map_err(E::FinishEncoder)?;
1100            }
1101            ArchiveFormat::Tar => {
1102                do_archive_files(&mut archive_file)?;
1103            }
1104        };
1105    }
1106
1107    // Atomically move the archive into position for other validators to find
1108    let metadata = fs::metadata(&staging_archive_path)
1109        .map_err(|err| E::QueryArchiveMetadata(err, staging_archive_path.clone()))?;
1110    let archive_path = archive_path.as_ref().to_path_buf();
1111    fs::rename(&staging_archive_path, &archive_path)
1112        .map_err(|err| E::MoveArchive(err, staging_archive_path, archive_path.clone()))?;
1113
1114    timer.stop();
1115    info!(
1116        "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
1117        archive_path.display(),
1118        snapshot_slot,
1119        timer.as_ms(),
1120        metadata.len()
1121    );
1122
1123    datapoint_info!(
1124        "archive-snapshot-package",
1125        ("slot", snapshot_slot, i64),
1126        ("archive_format", archive_format.to_string(), String),
1127        ("duration_ms", timer.as_ms(), i64),
1128        (
1129            if snapshot_kind.is_full_snapshot() {
1130                "full-snapshot-archive-size"
1131            } else {
1132                "incremental-snapshot-archive-size"
1133            },
1134            metadata.len(),
1135            i64
1136        ),
1137    );
1138    Ok(SnapshotArchiveInfo {
1139        path: archive_path,
1140        slot: snapshot_slot,
1141        hash: snapshot_hash,
1142        archive_format,
1143    })
1144}
1145
1146/// Get the bank snapshots in a directory
1147pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1148    let mut bank_snapshots = Vec::default();
1149    match fs::read_dir(&bank_snapshots_dir) {
1150        Err(err) => {
1151            info!(
1152                "Unable to read bank snapshots directory '{}': {err}",
1153                bank_snapshots_dir.as_ref().display(),
1154            );
1155        }
1156        Ok(paths) => paths
1157            .filter_map(|entry| {
1158                // check if this entry is a directory and only a Slot
1159                // bank snapshots are bank_snapshots_dir/slot/slot(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION)
1160                entry
1161                    .ok()
1162                    .filter(|entry| entry.path().is_dir())
1163                    .and_then(|entry| {
1164                        entry
1165                            .path()
1166                            .file_name()
1167                            .and_then(|file_name| file_name.to_str())
1168                            .and_then(|file_name| file_name.parse::<Slot>().ok())
1169                    })
1170            })
1171            .for_each(
1172                |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
1173                    Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
1174                    // Other threads may be modifying bank snapshots in parallel; only return
1175                    // snapshots that are complete as deemed by BankSnapshotInfo::new_from_dir()
1176                    Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
1177                },
1178            ),
1179    }
1180    bank_snapshots
1181}
1182
1183/// Get the bank snapshots in a directory
1184///
1185/// This function retains only the bank snapshots of kind BankSnapshotKind::Pre
1186pub fn get_bank_snapshots_pre(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1187    let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1188    bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Pre);
1189    bank_snapshots
1190}
1191
1192/// Get the bank snapshots in a directory
1193///
1194/// This function retains only the bank snapshots of kind BankSnapshotKind::Post
1195pub fn get_bank_snapshots_post(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1196    let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1197    bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Post);
1198    bank_snapshots
1199}
1200
1201/// Get the bank snapshot with the highest slot in a directory
1202///
1203/// This function gets the highest bank snapshot of kind BankSnapshotKind::Pre
1204pub fn get_highest_bank_snapshot_pre(
1205    bank_snapshots_dir: impl AsRef<Path>,
1206) -> Option<BankSnapshotInfo> {
1207    do_get_highest_bank_snapshot(get_bank_snapshots_pre(bank_snapshots_dir))
1208}
1209
1210/// Get the bank snapshot with the highest slot in a directory
1211///
1212/// This function gets the highest bank snapshot of kind BankSnapshotKind::Post
1213pub fn get_highest_bank_snapshot_post(
1214    bank_snapshots_dir: impl AsRef<Path>,
1215) -> Option<BankSnapshotInfo> {
1216    do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
1217}
1218
1219/// Get the bank snapshot with the highest slot in a directory
1220///
1221/// This function gets the highest bank snapshot of any kind
1222pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
1223    do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
1224}
1225
1226fn do_get_highest_bank_snapshot(
1227    mut bank_snapshots: Vec<BankSnapshotInfo>,
1228) -> Option<BankSnapshotInfo> {
1229    bank_snapshots.sort_unstable();
1230    bank_snapshots.into_iter().next_back()
1231}
1232
1233pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
1234where
1235    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1236{
1237    serialize_snapshot_data_file_capped::<F>(
1238        data_file_path,
1239        MAX_SNAPSHOT_DATA_FILE_SIZE,
1240        serializer,
1241    )
1242}
1243
1244pub fn deserialize_snapshot_data_file<T: Sized>(
1245    data_file_path: &Path,
1246    deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
1247) -> Result<T> {
1248    let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
1249        deserializer(streams.full_snapshot_stream)
1250    };
1251
1252    let wrapped_data_file_path = SnapshotRootPaths {
1253        full_snapshot_root_file_path: data_file_path.to_path_buf(),
1254        incremental_snapshot_root_file_path: None,
1255    };
1256
1257    deserialize_snapshot_data_files_capped(
1258        &wrapped_data_file_path,
1259        MAX_SNAPSHOT_DATA_FILE_SIZE,
1260        wrapped_deserializer,
1261    )
1262}
1263
1264pub fn deserialize_snapshot_data_files<T: Sized>(
1265    snapshot_root_paths: &SnapshotRootPaths,
1266    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1267) -> Result<T> {
1268    deserialize_snapshot_data_files_capped(
1269        snapshot_root_paths,
1270        MAX_SNAPSHOT_DATA_FILE_SIZE,
1271        deserializer,
1272    )
1273}
1274
1275fn serialize_snapshot_data_file_capped<F>(
1276    data_file_path: &Path,
1277    maximum_file_size: u64,
1278    serializer: F,
1279) -> Result<u64>
1280where
1281    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1282{
1283    let data_file = fs::File::create(data_file_path)?;
1284    let mut data_file_stream = BufWriter::new(data_file);
1285    serializer(&mut data_file_stream)?;
1286    data_file_stream.flush()?;
1287
1288    let consumed_size = data_file_stream.stream_position()?;
1289    if consumed_size > maximum_file_size {
1290        let error_message = format!(
1291            "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
1292            data_file_path.display(),
1293        );
1294        return Err(IoError::other(error_message).into());
1295    }
1296    Ok(consumed_size)
1297}
1298
1299fn deserialize_snapshot_data_files_capped<T: Sized>(
1300    snapshot_root_paths: &SnapshotRootPaths,
1301    maximum_file_size: u64,
1302    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1303) -> Result<T> {
1304    let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
1305        create_snapshot_data_file_stream(
1306            &snapshot_root_paths.full_snapshot_root_file_path,
1307            maximum_file_size,
1308        )?;
1309
1310    let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
1311        if let Some(ref incremental_snapshot_root_file_path) =
1312            snapshot_root_paths.incremental_snapshot_root_file_path
1313        {
1314            Some(create_snapshot_data_file_stream(
1315                incremental_snapshot_root_file_path,
1316                maximum_file_size,
1317            )?)
1318        } else {
1319            None
1320        }
1321        .unzip();
1322
1323    let mut snapshot_streams = SnapshotStreams {
1324        full_snapshot_stream: &mut full_snapshot_data_file_stream,
1325        incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
1326    };
1327    let ret = deserializer(&mut snapshot_streams)?;
1328
1329    check_deserialize_file_consumed(
1330        full_snapshot_file_size,
1331        &snapshot_root_paths.full_snapshot_root_file_path,
1332        &mut full_snapshot_data_file_stream,
1333    )?;
1334
1335    if let Some(ref incremental_snapshot_root_file_path) =
1336        snapshot_root_paths.incremental_snapshot_root_file_path
1337    {
1338        check_deserialize_file_consumed(
1339            incremental_snapshot_file_size.unwrap(),
1340            incremental_snapshot_root_file_path,
1341            incremental_snapshot_data_file_stream.as_mut().unwrap(),
1342        )?;
1343    }
1344
1345    Ok(ret)
1346}
1347
1348/// Before running the deserializer function, perform common operations on the snapshot archive
1349/// files, such as checking the file size and opening the file into a stream.
1350fn create_snapshot_data_file_stream(
1351    snapshot_root_file_path: impl AsRef<Path>,
1352    maximum_file_size: u64,
1353) -> Result<(u64, BufReader<std::fs::File>)> {
1354    let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
1355
1356    if snapshot_file_size > maximum_file_size {
1357        let error_message = format!(
1358            "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
1359            snapshot_root_file_path.as_ref().display(),
1360            snapshot_file_size,
1361            maximum_file_size,
1362        );
1363        return Err(IoError::other(error_message).into());
1364    }
1365
1366    let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
1367    let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
1368
1369    Ok((snapshot_file_size, snapshot_data_file_stream))
1370}
1371
1372/// After running the deserializer function, perform common checks to ensure the snapshot archive
1373/// files were consumed correctly.
1374fn check_deserialize_file_consumed(
1375    file_size: u64,
1376    file_path: impl AsRef<Path>,
1377    file_stream: &mut BufReader<std::fs::File>,
1378) -> Result<()> {
1379    let consumed_size = file_stream.stream_position()?;
1380
1381    if consumed_size != file_size {
1382        let error_message = format!(
1383            "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to deserialize",
1384            file_path.as_ref().display(),
1385            file_size,
1386            consumed_size,
1387        );
1388        return Err(IoError::other(error_message).into());
1389    }
1390
1391    Ok(())
1392}
1393
1394/// Return account path from the appendvec path after checking its format.
1395fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
1396    let run_path = appendvec_path.parent()?;
1397    let run_file_name = run_path.file_name()?;
1398    // All appendvec files should be under <account_path>/run/.
1399    // When generating the bank snapshot directory, they are hardlinked to <account_path>/snapshot/<slot>/
1400    if run_file_name != ACCOUNTS_RUN_DIR {
1401        error!(
1402            "The account path {} does not have run/ as its immediate parent directory.",
1403            run_path.display()
1404        );
1405        return None;
1406    }
1407    let account_path = run_path.parent()?;
1408    Some(account_path.to_path_buf())
1409}
1410
1411/// From an appendvec path, derive the snapshot hardlink path.  If the corresponding snapshot hardlink
1412/// directory does not exist, create it.
1413fn get_snapshot_accounts_hardlink_dir(
1414    appendvec_path: &Path,
1415    bank_slot: Slot,
1416    account_paths: &mut HashSet<PathBuf>,
1417    hardlinks_dir: impl AsRef<Path>,
1418) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1419    let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1420        GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1421    })?;
1422
1423    let snapshot_hardlink_dir = account_path
1424        .join(ACCOUNTS_SNAPSHOT_DIR)
1425        .join(bank_slot.to_string());
1426
1427    // Use the hashset to track, to avoid checking the file system.  Only set up the hardlink directory
1428    // and the symlink to it at the first time of seeing the account_path.
1429    if !account_paths.contains(&account_path) {
1430        let idx = account_paths.len();
1431        debug!(
1432            "for appendvec_path {}, create hard-link path {}",
1433            appendvec_path.display(),
1434            snapshot_hardlink_dir.display()
1435        );
1436        fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1437            GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1438                err,
1439                snapshot_hardlink_dir.clone(),
1440            )
1441        })?;
1442        let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1443        symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1444            GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1445                source: err,
1446                original: snapshot_hardlink_dir.clone(),
1447                link: symlink_path,
1448            }
1449        })?;
1450        account_paths.insert(account_path);
1451    };
1452
1453    Ok(snapshot_hardlink_dir)
1454}
1455
1456/// Hard-link the files from accounts/ to snapshot/<bank_slot>/accounts/
1457/// This keeps the appendvec files alive and with the bank snapshot.  The slot and id
1458/// in the file names are also updated in case its file is a recycled one with inconsistent slot
1459/// and id.
1460pub fn hard_link_storages_to_snapshot(
1461    bank_snapshot_dir: impl AsRef<Path>,
1462    bank_slot: Slot,
1463    snapshot_storages: &[Arc<AccountStorageEntry>],
1464) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1465    let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1466    fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1467        HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1468            err,
1469            accounts_hardlinks_dir.clone(),
1470        )
1471    })?;
1472
1473    let mut account_paths: HashSet<PathBuf> = HashSet::new();
1474    for storage in snapshot_storages {
1475        let storage_path = storage.accounts.path();
1476        let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1477            storage_path,
1478            bank_slot,
1479            &mut account_paths,
1480            &accounts_hardlinks_dir,
1481        )?;
1482        // The appendvec could be recycled, so its filename may not be consistent to the slot and id.
1483        // Use the storage slot and id to compose a consistent file name for the hard-link file.
1484        let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
1485        let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1486        fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
1487            HardLinkStoragesToSnapshotError::HardLinkStorage(
1488                err,
1489                storage_path.to_path_buf(),
1490                hard_link_path,
1491            )
1492        })?;
1493    }
1494    Ok(())
1495}
1496
1497/// serializing needs Vec<Vec<Arc<AccountStorageEntry>>>, but data structure at runtime is Vec<Arc<AccountStorageEntry>>
1498/// translates to what we need
1499pub(crate) fn get_storages_to_serialize(
1500    snapshot_storages: &[Arc<AccountStorageEntry>],
1501) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1502    snapshot_storages
1503        .iter()
1504        .map(|storage| vec![Arc::clone(storage)])
1505        .collect::<Vec<_>>()
1506}
1507
1508// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
1509const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
1510
1511/// Unarchives the given full and incremental snapshot archives, as long as they are compatible.
1512pub fn verify_and_unarchive_snapshots(
1513    bank_snapshots_dir: impl AsRef<Path>,
1514    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1515    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1516    account_paths: &[PathBuf],
1517    storage_access: StorageAccess,
1518) -> Result<(
1519    UnarchivedSnapshot,
1520    Option<UnarchivedSnapshot>,
1521    AtomicAccountsFileId,
1522)> {
1523    check_are_snapshots_compatible(
1524        full_snapshot_archive_info,
1525        incremental_snapshot_archive_info,
1526    )?;
1527
1528    let parallel_divisions = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT);
1529
1530    let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
1531    let unarchived_full_snapshot = unarchive_snapshot(
1532        &bank_snapshots_dir,
1533        TMP_SNAPSHOT_ARCHIVE_PREFIX,
1534        full_snapshot_archive_info.path(),
1535        "snapshot untar",
1536        account_paths,
1537        full_snapshot_archive_info.archive_format(),
1538        parallel_divisions,
1539        next_append_vec_id.clone(),
1540        storage_access,
1541    )?;
1542
1543    let unarchived_incremental_snapshot =
1544        if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1545            let unarchived_incremental_snapshot = unarchive_snapshot(
1546                &bank_snapshots_dir,
1547                TMP_SNAPSHOT_ARCHIVE_PREFIX,
1548                incremental_snapshot_archive_info.path(),
1549                "incremental snapshot untar",
1550                account_paths,
1551                incremental_snapshot_archive_info.archive_format(),
1552                parallel_divisions,
1553                next_append_vec_id.clone(),
1554                storage_access,
1555            )?;
1556            Some(unarchived_incremental_snapshot)
1557        } else {
1558            None
1559        };
1560
1561    Ok((
1562        unarchived_full_snapshot,
1563        unarchived_incremental_snapshot,
1564        Arc::try_unwrap(next_append_vec_id).unwrap(),
1565    ))
1566}
1567
1568/// Spawns a thread for unpacking a snapshot
1569fn spawn_unpack_snapshot_thread(
1570    file_sender: Sender<PathBuf>,
1571    account_paths: Arc<Vec<PathBuf>>,
1572    ledger_dir: Arc<PathBuf>,
1573    mut archive: Archive<SharedBufferReader>,
1574    parallel_selector: Option<ParallelSelector>,
1575    thread_index: usize,
1576) -> JoinHandle<()> {
1577    Builder::new()
1578        .name(format!("solUnpkSnpsht{thread_index:02}"))
1579        .spawn(move || {
1580            hardened_unpack::streaming_unpack_snapshot(
1581                &mut archive,
1582                ledger_dir.as_path(),
1583                &account_paths,
1584                parallel_selector,
1585                &file_sender,
1586            )
1587            .unwrap();
1588        })
1589        .unwrap()
1590}
1591
1592/// Streams unpacked files across channel
1593fn streaming_unarchive_snapshot(
1594    file_sender: Sender<PathBuf>,
1595    account_paths: Vec<PathBuf>,
1596    ledger_dir: PathBuf,
1597    snapshot_archive_path: PathBuf,
1598    archive_format: ArchiveFormat,
1599    num_threads: usize,
1600) -> Vec<JoinHandle<()>> {
1601    let account_paths = Arc::new(account_paths);
1602    let ledger_dir = Arc::new(ledger_dir);
1603    let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format);
1604
1605    // All shared buffer readers need to be created before the threads are spawned
1606    let archives: Vec<_> = (0..num_threads)
1607        .map(|_| {
1608            let reader = SharedBufferReader::new(&shared_buffer);
1609            Archive::new(reader)
1610        })
1611        .collect();
1612
1613    archives
1614        .into_iter()
1615        .enumerate()
1616        .map(|(thread_index, archive)| {
1617            let parallel_selector = Some(ParallelSelector {
1618                index: thread_index,
1619                divisions: num_threads,
1620            });
1621
1622            spawn_unpack_snapshot_thread(
1623                file_sender.clone(),
1624                account_paths.clone(),
1625                ledger_dir.clone(),
1626                archive,
1627                parallel_selector,
1628                thread_index,
1629            )
1630        })
1631        .collect()
1632}
1633
1634/// BankSnapshotInfo::new_from_dir() requires a few meta files to accept a snapshot dir
1635/// as a valid one.  A dir unpacked from an archive lacks these files.  Fill them here to
1636/// allow new_from_dir() checks to pass.  These checks are not needed for unpacked dirs,
1637/// but it is not clean to add another flag to new_from_dir() to skip them.
1638fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1639    let snapshots_dir = unpack_dir.as_ref().join("snapshots");
1640    if !snapshots_dir.is_dir() {
1641        return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1642    }
1643
1644    // The unpacked dir has a single slot dir, which is the snapshot slot dir.
1645    let slot_dir = std::fs::read_dir(&snapshots_dir)
1646        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1647        .find(|entry| entry.as_ref().unwrap().path().is_dir())
1648        .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1649        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1650        .path();
1651
1652    let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
1653    fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
1654
1655    let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1656    fs::hard_link(
1657        status_cache_file,
1658        slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
1659    )?;
1660
1661    let state_complete_file = slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
1662    fs::File::create(state_complete_file)?;
1663
1664    Ok(())
1665}
1666
1667/// Perform the common tasks when unarchiving a snapshot.  Handles creating the temporary
1668/// directories, untaring, reading the version file, and then returning those fields plus the
1669/// rebuilt storage
1670fn unarchive_snapshot(
1671    bank_snapshots_dir: impl AsRef<Path>,
1672    unpacked_snapshots_dir_prefix: &'static str,
1673    snapshot_archive_path: impl AsRef<Path>,
1674    measure_name: &'static str,
1675    account_paths: &[PathBuf],
1676    archive_format: ArchiveFormat,
1677    parallel_divisions: usize,
1678    next_append_vec_id: Arc<AtomicAccountsFileId>,
1679    storage_access: StorageAccess,
1680) -> Result<UnarchivedSnapshot> {
1681    let unpack_dir = tempfile::Builder::new()
1682        .prefix(unpacked_snapshots_dir_prefix)
1683        .tempdir_in(bank_snapshots_dir)?;
1684    let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
1685
1686    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1687    streaming_unarchive_snapshot(
1688        file_sender,
1689        account_paths.to_vec(),
1690        unpack_dir.path().to_path_buf(),
1691        snapshot_archive_path.as_ref().to_path_buf(),
1692        archive_format,
1693        parallel_divisions,
1694    );
1695
1696    let num_rebuilder_threads = num_cpus::get_physical()
1697        .saturating_sub(parallel_divisions)
1698        .max(1);
1699    let (version_and_storages, measure_untar) = measure_time!(
1700        SnapshotStorageRebuilder::rebuild_storage(
1701            file_receiver,
1702            num_rebuilder_threads,
1703            next_append_vec_id,
1704            SnapshotFrom::Archive,
1705            storage_access,
1706        )?,
1707        measure_name
1708    );
1709    info!("{}", measure_untar);
1710
1711    create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1712
1713    let RebuiltSnapshotStorage {
1714        snapshot_version,
1715        storage,
1716    } = version_and_storages;
1717    Ok(UnarchivedSnapshot {
1718        unpack_dir,
1719        storage,
1720        unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1721            unpacked_snapshots_dir,
1722            snapshot_version,
1723        },
1724        measure_untar,
1725    })
1726}
1727
1728/// Streams snapshot dir files across channel
1729/// Follow the flow of streaming_unarchive_snapshot(), but handle the from_dir case.
1730fn streaming_snapshot_dir_files(
1731    file_sender: Sender<PathBuf>,
1732    snapshot_file_path: impl Into<PathBuf>,
1733    snapshot_version_path: impl Into<PathBuf>,
1734    account_paths: &[PathBuf],
1735) -> Result<()> {
1736    file_sender.send(snapshot_file_path.into())?;
1737    file_sender.send(snapshot_version_path.into())?;
1738
1739    for account_path in account_paths {
1740        for file in fs::read_dir(account_path)? {
1741            file_sender.send(file?.path())?;
1742        }
1743    }
1744
1745    Ok(())
1746}
1747
1748/// Performs the common tasks when deserializing a snapshot
1749///
1750/// Handles reading the snapshot file and version file,
1751/// then returning those fields plus the rebuilt storages.
1752pub fn rebuild_storages_from_snapshot_dir(
1753    snapshot_info: &BankSnapshotInfo,
1754    account_paths: &[PathBuf],
1755    next_append_vec_id: Arc<AtomicAccountsFileId>,
1756    storage_access: StorageAccess,
1757) -> Result<AccountStorageMap> {
1758    let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1759    let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1760    let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1761
1762    let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1763        IoError::other(format!(
1764            "failed to read accounts hardlinks dir '{}': {err}",
1765            accounts_hardlinks.display(),
1766        ))
1767    })?;
1768    for dir_entry in read_dir {
1769        let symlink_path = dir_entry?.path();
1770        // The symlink point to <account_path>/snapshot/<slot> which contain the account files hardlinks
1771        // The corresponding run path should be <account_path>/run/
1772        let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
1773            IoError::other(format!(
1774                "failed to read symlink '{}': {err}",
1775                symlink_path.display(),
1776            ))
1777        })?;
1778        let account_run_path = account_snapshot_path
1779            .parent()
1780            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1781            .parent()
1782            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1783            .join(ACCOUNTS_RUN_DIR);
1784        if !account_run_paths.contains(&account_run_path) {
1785            // The appendvec from the bank snapshot storage does not match any of the provided account_paths set.
1786            // The accout paths have changed so the snapshot is no longer usable.
1787            return Err(SnapshotError::AccountPathsMismatch);
1788        }
1789        // Generate hard-links to make the account files available in the main accounts/, and let the new appendvec
1790        // paths be in accounts/
1791        let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
1792            IoError::other(format!(
1793                "failed to read account snapshot dir '{}': {err}",
1794                account_snapshot_path.display(),
1795            ))
1796        })?;
1797        for file in read_dir {
1798            let file_path = file?.path();
1799            let file_name = file_path
1800                .file_name()
1801                .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
1802            let dest_path = account_run_path.join(file_name);
1803            fs::hard_link(&file_path, &dest_path).map_err(|err| {
1804                IoError::other(format!(
1805                    "failed to hard link from '{}' to '{}': {err}",
1806                    file_path.display(),
1807                    dest_path.display(),
1808                ))
1809            })?;
1810        }
1811    }
1812
1813    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1814    let snapshot_file_path = &snapshot_info.snapshot_path();
1815    let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1816    streaming_snapshot_dir_files(
1817        file_sender,
1818        snapshot_file_path,
1819        snapshot_version_path,
1820        account_paths,
1821    )?;
1822
1823    let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1824    let version_and_storages = SnapshotStorageRebuilder::rebuild_storage(
1825        file_receiver,
1826        num_rebuilder_threads,
1827        next_append_vec_id,
1828        SnapshotFrom::Dir,
1829        storage_access,
1830    )?;
1831
1832    let RebuiltSnapshotStorage {
1833        snapshot_version: _,
1834        storage,
1835    } = version_and_storages;
1836    Ok(storage)
1837}
1838
1839/// Reads the `snapshot_version` from a file. Before opening the file, its size
1840/// is compared to `MAX_SNAPSHOT_VERSION_FILE_SIZE`. If the size exceeds this
1841/// threshold, it is not opened and an error is returned.
1842fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
1843    // Check file size.
1844    let file_metadata = fs::metadata(&path).map_err(|err| {
1845        IoError::other(format!(
1846            "failed to query snapshot version file metadata '{}': {err}",
1847            path.as_ref().display(),
1848        ))
1849    })?;
1850    let file_size = file_metadata.len();
1851    if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
1852        let error_message = format!(
1853            "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
1854            path.as_ref().display(),
1855            file_size,
1856            MAX_SNAPSHOT_VERSION_FILE_SIZE,
1857        );
1858        return Err(IoError::other(error_message).into());
1859    }
1860
1861    // Read snapshot_version from file.
1862    let mut snapshot_version = String::new();
1863    let mut file = fs::File::open(&path).map_err(|err| {
1864        IoError::other(format!(
1865            "failed to open snapshot version file '{}': {err}",
1866            path.as_ref().display()
1867        ))
1868    })?;
1869    file.read_to_string(&mut snapshot_version).map_err(|err| {
1870        IoError::other(format!(
1871            "failed to read snapshot version from file '{}': {err}",
1872            path.as_ref().display()
1873        ))
1874    })?;
1875
1876    Ok(snapshot_version.trim().to_string())
1877}
1878
1879/// Check if an incremental snapshot is compatible with a full snapshot.  This is done by checking
1880/// if the incremental snapshot's base slot is the same as the full snapshot's slot.
1881fn check_are_snapshots_compatible(
1882    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1883    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1884) -> Result<()> {
1885    if incremental_snapshot_archive_info.is_none() {
1886        return Ok(());
1887    }
1888
1889    let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
1890
1891    (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
1892        .then_some(())
1893        .ok_or_else(|| {
1894            SnapshotError::MismatchedBaseSlot(
1895                full_snapshot_archive_info.slot(),
1896                incremental_snapshot_archive_info.base_slot(),
1897            )
1898        })
1899}
1900
1901/// Get the `&str` from a `&Path`
1902pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
1903    path.file_name()
1904        .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))?
1905        .to_str()
1906        .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
1907}
1908
1909pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
1910    snapshot_archives_dir
1911        .as_ref()
1912        .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
1913}
1914
1915/// Build the full snapshot archive path from its components: the snapshot archives directory, the
1916/// snapshot slot, the accounts hash, and the archive format.
1917pub fn build_full_snapshot_archive_path(
1918    full_snapshot_archives_dir: impl AsRef<Path>,
1919    slot: Slot,
1920    hash: &SnapshotHash,
1921    archive_format: ArchiveFormat,
1922) -> PathBuf {
1923    full_snapshot_archives_dir.as_ref().join(format!(
1924        "snapshot-{}-{}.{}",
1925        slot,
1926        hash.0,
1927        archive_format.extension(),
1928    ))
1929}
1930
1931/// Build the incremental snapshot archive path from its components: the snapshot archives
1932/// directory, the snapshot base slot, the snapshot slot, the accounts hash, and the archive
1933/// format.
1934pub fn build_incremental_snapshot_archive_path(
1935    incremental_snapshot_archives_dir: impl AsRef<Path>,
1936    base_slot: Slot,
1937    slot: Slot,
1938    hash: &SnapshotHash,
1939    archive_format: ArchiveFormat,
1940) -> PathBuf {
1941    incremental_snapshot_archives_dir.as_ref().join(format!(
1942        "incremental-snapshot-{}-{}-{}.{}",
1943        base_slot,
1944        slot,
1945        hash.0,
1946        archive_format.extension(),
1947    ))
1948}
1949
1950/// Parse a full snapshot archive filename into its Slot, Hash, and Archive Format
1951pub(crate) fn parse_full_snapshot_archive_filename(
1952    archive_filename: &str,
1953) -> Result<(Slot, SnapshotHash, ArchiveFormat)> {
1954    lazy_static! {
1955        static ref RE: Regex = Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1956    }
1957
1958    let do_parse = || {
1959        RE.captures(archive_filename).and_then(|captures| {
1960            let slot = captures
1961                .name("slot")
1962                .map(|x| x.as_str().parse::<Slot>())?
1963                .ok()?;
1964            let hash = captures
1965                .name("hash")
1966                .map(|x| x.as_str().parse::<Hash>())?
1967                .ok()?;
1968            let archive_format = captures
1969                .name("ext")
1970                .map(|x| x.as_str().parse::<ArchiveFormat>())?
1971                .ok()?;
1972
1973            Some((slot, SnapshotHash(hash), archive_format))
1974        })
1975    };
1976
1977    do_parse().ok_or_else(|| {
1978        SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
1979    })
1980}
1981
1982/// Parse an incremental snapshot archive filename into its base Slot, actual Slot, Hash, and Archive Format
1983pub(crate) fn parse_incremental_snapshot_archive_filename(
1984    archive_filename: &str,
1985) -> Result<(Slot, Slot, SnapshotHash, ArchiveFormat)> {
1986    lazy_static! {
1987        static ref RE: Regex = Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1988    }
1989
1990    let do_parse = || {
1991        RE.captures(archive_filename).and_then(|captures| {
1992            let base_slot = captures
1993                .name("base")
1994                .map(|x| x.as_str().parse::<Slot>())?
1995                .ok()?;
1996            let slot = captures
1997                .name("slot")
1998                .map(|x| x.as_str().parse::<Slot>())?
1999                .ok()?;
2000            let hash = captures
2001                .name("hash")
2002                .map(|x| x.as_str().parse::<Hash>())?
2003                .ok()?;
2004            let archive_format = captures
2005                .name("ext")
2006                .map(|x| x.as_str().parse::<ArchiveFormat>())?
2007                .ok()?;
2008
2009            Some((base_slot, slot, SnapshotHash(hash), archive_format))
2010        })
2011    };
2012
2013    do_parse().ok_or_else(|| {
2014        SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2015    })
2016}
2017
2018/// Walk down the snapshot archive to collect snapshot archive file info
2019fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
2020where
2021    F: Fn(PathBuf) -> Result<T>,
2022{
2023    let walk_dir = |dir: &Path| -> Vec<T> {
2024        let entry_iter = fs::read_dir(dir);
2025        match entry_iter {
2026            Err(err) => {
2027                info!(
2028                    "Unable to read snapshot archives directory '{}': {err}",
2029                    dir.display(),
2030                );
2031                vec![]
2032            }
2033            Ok(entries) => entries
2034                .filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
2035                .collect(),
2036        }
2037    };
2038
2039    let mut ret = walk_dir(snapshot_archives_dir);
2040    let remote_dir = build_snapshot_archives_remote_dir(snapshot_archives_dir);
2041    if remote_dir.exists() {
2042        ret.append(&mut walk_dir(remote_dir.as_ref()));
2043    }
2044    ret
2045}
2046
2047/// Get a list of the full snapshot archives from a directory
2048pub fn get_full_snapshot_archives(
2049    full_snapshot_archives_dir: impl AsRef<Path>,
2050) -> Vec<FullSnapshotArchiveInfo> {
2051    get_snapshot_archives(
2052        full_snapshot_archives_dir.as_ref(),
2053        FullSnapshotArchiveInfo::new_from_path,
2054    )
2055}
2056
2057/// Get a list of the incremental snapshot archives from a directory
2058pub fn get_incremental_snapshot_archives(
2059    incremental_snapshot_archives_dir: impl AsRef<Path>,
2060) -> Vec<IncrementalSnapshotArchiveInfo> {
2061    get_snapshot_archives(
2062        incremental_snapshot_archives_dir.as_ref(),
2063        IncrementalSnapshotArchiveInfo::new_from_path,
2064    )
2065}
2066
2067/// Get the highest slot of the full snapshot archives in a directory
2068pub fn get_highest_full_snapshot_archive_slot(
2069    full_snapshot_archives_dir: impl AsRef<Path>,
2070) -> Option<Slot> {
2071    get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
2072        .map(|full_snapshot_archive_info| full_snapshot_archive_info.slot())
2073}
2074
2075/// Get the highest slot of the incremental snapshot archives in a directory, for a given full
2076/// snapshot slot
2077pub fn get_highest_incremental_snapshot_archive_slot(
2078    incremental_snapshot_archives_dir: impl AsRef<Path>,
2079    full_snapshot_slot: Slot,
2080) -> Option<Slot> {
2081    get_highest_incremental_snapshot_archive_info(
2082        incremental_snapshot_archives_dir,
2083        full_snapshot_slot,
2084    )
2085    .map(|incremental_snapshot_archive_info| incremental_snapshot_archive_info.slot())
2086}
2087
2088/// Get the path (and metadata) for the full snapshot archive with the highest slot in a directory
2089pub fn get_highest_full_snapshot_archive_info(
2090    full_snapshot_archives_dir: impl AsRef<Path>,
2091) -> Option<FullSnapshotArchiveInfo> {
2092    let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2093    full_snapshot_archives.sort_unstable();
2094    full_snapshot_archives.into_iter().next_back()
2095}
2096
2097/// Get the path for the incremental snapshot archive with the highest slot, for a given full
2098/// snapshot slot, in a directory
2099pub fn get_highest_incremental_snapshot_archive_info(
2100    incremental_snapshot_archives_dir: impl AsRef<Path>,
2101    full_snapshot_slot: Slot,
2102) -> Option<IncrementalSnapshotArchiveInfo> {
2103    // Since we want to filter down to only the incremental snapshot archives that have the same
2104    // full snapshot slot as the value passed in, perform the filtering before sorting to avoid
2105    // doing unnecessary work.
2106    let mut incremental_snapshot_archives =
2107        get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
2108            .into_iter()
2109            .filter(|incremental_snapshot_archive_info| {
2110                incremental_snapshot_archive_info.base_slot() == full_snapshot_slot
2111            })
2112            .collect::<Vec<_>>();
2113    incremental_snapshot_archives.sort_unstable();
2114    incremental_snapshot_archives.into_iter().next_back()
2115}
2116
2117pub fn purge_old_snapshot_archives(
2118    full_snapshot_archives_dir: impl AsRef<Path>,
2119    incremental_snapshot_archives_dir: impl AsRef<Path>,
2120    maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2121    maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2122) {
2123    info!(
2124        "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
2125        full_snapshot_archives_dir.as_ref().display(),
2126        maximum_full_snapshot_archives_to_retain
2127    );
2128
2129    let mut full_snapshot_archives = get_full_snapshot_archives(&full_snapshot_archives_dir);
2130    full_snapshot_archives.sort_unstable();
2131    full_snapshot_archives.reverse();
2132
2133    let num_to_retain = full_snapshot_archives
2134        .len()
2135        .min(maximum_full_snapshot_archives_to_retain.get());
2136    trace!(
2137        "There are {} full snapshot archives, retaining {}",
2138        full_snapshot_archives.len(),
2139        num_to_retain,
2140    );
2141
2142    let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
2143        if full_snapshot_archives.is_empty() {
2144            None
2145        } else {
2146            Some(full_snapshot_archives.split_at(num_to_retain))
2147        }
2148        .unwrap_or_default();
2149
2150    let retained_full_snapshot_slots = full_snapshot_archives_to_retain
2151        .iter()
2152        .map(|ai| ai.slot())
2153        .collect::<HashSet<_>>();
2154
2155    fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
2156        for path in archives.iter().map(|a| a.path()) {
2157            trace!("Removing snapshot archive: {}", path.display());
2158            let result = fs::remove_file(path);
2159            if let Err(err) = result {
2160                info!(
2161                    "Failed to remove snapshot archive '{}': {err}",
2162                    path.display()
2163                );
2164            }
2165        }
2166    }
2167    remove_archives(full_snapshot_archives_to_remove);
2168
2169    info!(
2170        "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
2171        incremental_snapshot_archives_dir.as_ref().display(),
2172        maximum_incremental_snapshot_archives_to_retain
2173    );
2174    let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
2175    for incremental_snapshot_archive in
2176        get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
2177    {
2178        incremental_snapshot_archives_by_base_slot
2179            .entry(incremental_snapshot_archive.base_slot())
2180            .or_default()
2181            .push(incremental_snapshot_archive)
2182    }
2183
2184    let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
2185    for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
2186    {
2187        incremental_snapshot_archives.sort_unstable();
2188        let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
2189            maximum_incremental_snapshot_archives_to_retain.get()
2190        } else {
2191            usize::from(retained_full_snapshot_slots.contains(&base_slot))
2192        };
2193        trace!(
2194            "There are {} incremental snapshot archives for base slot {}, removing {} of them",
2195            incremental_snapshot_archives.len(),
2196            base_slot,
2197            incremental_snapshot_archives
2198                .len()
2199                .saturating_sub(num_to_retain),
2200        );
2201
2202        incremental_snapshot_archives.truncate(
2203            incremental_snapshot_archives
2204                .len()
2205                .saturating_sub(num_to_retain),
2206        );
2207        remove_archives(&incremental_snapshot_archives);
2208    }
2209}
2210
2211#[cfg(feature = "dev-context-only-utils")]
2212fn unpack_snapshot_local(
2213    shared_buffer: SharedBuffer,
2214    ledger_dir: &Path,
2215    account_paths: &[PathBuf],
2216    parallel_divisions: usize,
2217) -> Result<UnpackedAppendVecMap> {
2218    assert!(parallel_divisions > 0);
2219
2220    // allocate all readers before any readers start reading
2221    let readers = (0..parallel_divisions)
2222        .map(|_| SharedBufferReader::new(&shared_buffer))
2223        .collect::<Vec<_>>();
2224
2225    // create 'parallel_divisions' # of parallel workers, each responsible for 1/parallel_divisions of all the files to extract.
2226    let all_unpacked_append_vec_map = readers
2227        .into_par_iter()
2228        .enumerate()
2229        .map(|(index, reader)| {
2230            let parallel_selector = Some(ParallelSelector {
2231                index,
2232                divisions: parallel_divisions,
2233            });
2234            let mut archive = Archive::new(reader);
2235            hardened_unpack::unpack_snapshot(
2236                &mut archive,
2237                ledger_dir,
2238                account_paths,
2239                parallel_selector,
2240            )
2241        })
2242        .collect::<Vec<_>>();
2243
2244    let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
2245    for h in all_unpacked_append_vec_map {
2246        unpacked_append_vec_map.extend(h?);
2247    }
2248
2249    Ok(unpacked_append_vec_map)
2250}
2251
2252fn untar_snapshot_create_shared_buffer(
2253    snapshot_tar: &Path,
2254    archive_format: ArchiveFormat,
2255) -> SharedBuffer {
2256    let open_file = || {
2257        fs::File::open(snapshot_tar)
2258            .map_err(|err| {
2259                IoError::other(format!(
2260                    "failed to open snapshot archive '{}': {err}",
2261                    snapshot_tar.display(),
2262                ))
2263            })
2264            .unwrap()
2265    };
2266    match archive_format {
2267        ArchiveFormat::TarBzip2 => SharedBuffer::new(BzDecoder::new(BufReader::new(open_file()))),
2268        ArchiveFormat::TarGzip => SharedBuffer::new(GzDecoder::new(BufReader::new(open_file()))),
2269        ArchiveFormat::TarZstd => SharedBuffer::new(
2270            zstd::stream::read::Decoder::new(BufReader::new(open_file())).unwrap(),
2271        ),
2272        ArchiveFormat::TarLz4 => {
2273            SharedBuffer::new(lz4::Decoder::new(BufReader::new(open_file())).unwrap())
2274        }
2275        ArchiveFormat::Tar => SharedBuffer::new(BufReader::new(open_file())),
2276    }
2277}
2278
2279#[cfg(feature = "dev-context-only-utils")]
2280fn untar_snapshot_in(
2281    snapshot_tar: impl AsRef<Path>,
2282    unpack_dir: &Path,
2283    account_paths: &[PathBuf],
2284    archive_format: ArchiveFormat,
2285    parallel_divisions: usize,
2286) -> Result<UnpackedAppendVecMap> {
2287    let shared_buffer = untar_snapshot_create_shared_buffer(snapshot_tar.as_ref(), archive_format);
2288    unpack_snapshot_local(shared_buffer, unpack_dir, account_paths, parallel_divisions)
2289}
2290
2291pub fn verify_unpacked_snapshots_dir_and_version(
2292    unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
2293) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
2294    info!(
2295        "snapshot version: {}",
2296        &unpacked_snapshots_dir_and_version.snapshot_version
2297    );
2298
2299    let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
2300    let mut bank_snapshots =
2301        get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
2302    if bank_snapshots.len() > 1 {
2303        return Err(IoError::other(format!(
2304            "invalid snapshot format: only one snapshot allowed, but found {}",
2305            bank_snapshots.len(),
2306        ))
2307        .into());
2308    }
2309    let root_paths = bank_snapshots.pop().ok_or_else(|| {
2310        IoError::other(format!(
2311            "no snapshots found in snapshots directory '{}'",
2312            unpacked_snapshots_dir_and_version
2313                .unpacked_snapshots_dir
2314                .display(),
2315        ))
2316    })?;
2317    Ok((snapshot_version, root_paths))
2318}
2319
2320/// Returns the file name of the bank snapshot for `slot`
2321pub fn get_snapshot_file_name(slot: Slot) -> String {
2322    slot.to_string()
2323}
2324
2325/// Constructs the path to the bank snapshot directory for `slot` within `bank_snapshots_dir`
2326pub fn get_bank_snapshot_dir(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) -> PathBuf {
2327    bank_snapshots_dir
2328        .as_ref()
2329        .join(get_snapshot_file_name(slot))
2330}
2331
2332#[derive(Debug, Copy, Clone)]
2333/// allow tests to specify what happened to the serialized format
2334pub enum VerifyBank {
2335    /// the bank's serialized format is expected to be identical to what we are comparing against
2336    Deterministic,
2337    /// the serialized bank was 'reserialized' into a non-deterministic format
2338    /// so, deserialize both files and compare deserialized results
2339    NonDeterministic,
2340}
2341
2342#[cfg(feature = "dev-context-only-utils")]
2343pub fn verify_snapshot_archive(
2344    snapshot_archive: impl AsRef<Path>,
2345    snapshots_to_verify: impl AsRef<Path>,
2346    archive_format: ArchiveFormat,
2347    verify_bank: VerifyBank,
2348    slot: Slot,
2349) {
2350    let temp_dir = tempfile::TempDir::new().unwrap();
2351    let unpack_dir = temp_dir.path();
2352    let unpack_account_dir = create_accounts_run_and_snapshot_dirs(unpack_dir).unwrap().0;
2353    untar_snapshot_in(
2354        snapshot_archive,
2355        unpack_dir,
2356        &[unpack_account_dir.clone()],
2357        archive_format,
2358        1,
2359    )
2360    .unwrap();
2361
2362    // Check snapshots are the same
2363    let unpacked_snapshots = unpack_dir.join("snapshots");
2364
2365    // Since the unpack code collects all the appendvecs into one directory unpack_account_dir, we need to
2366    // collect all the appendvecs in account_paths/<slot>/snapshot/ into one directory for later comparison.
2367    let storages_to_verify = unpack_dir.join("storages_to_verify");
2368    // Create the directory if it doesn't exist
2369    fs::create_dir_all(&storages_to_verify).unwrap();
2370
2371    let slot = slot.to_string();
2372    let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
2373
2374    if let VerifyBank::NonDeterministic = verify_bank {
2375        // file contents may be different, but deserialized structs should be equal
2376        let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
2377        let p2 = unpacked_snapshots.join(&slot).join(&slot);
2378        assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
2379        fs::remove_file(p1).unwrap();
2380        fs::remove_file(p2).unwrap();
2381    }
2382
2383    // The new the status_cache file is inside the slot directory together with the snapshot file.
2384    // When unpacking an archive, the status_cache file from the archive is one-level up outside of
2385    //  the slot directory.
2386    // The unpacked status_cache file need to be put back into the slot directory for the directory
2387    // comparison to pass.
2388    let existing_unpacked_status_cache_file =
2389        unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2390    let new_unpacked_status_cache_file = unpacked_snapshots
2391        .join(&slot)
2392        .join(SNAPSHOT_STATUS_CACHE_FILENAME);
2393    fs::rename(
2394        existing_unpacked_status_cache_file,
2395        new_unpacked_status_cache_file,
2396    )
2397    .unwrap();
2398
2399    let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2400    if accounts_hardlinks_dir.is_dir() {
2401        // This directory contain symlinks to all <account_path>/snapshot/<slot> directories.
2402        for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
2403            let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
2404            // Copy all the files in dst_path into the storages_to_verify directory.
2405            for entry in fs::read_dir(&link_dst_path).unwrap() {
2406                let src_path = entry.unwrap().path();
2407                let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
2408                fs::copy(src_path, dst_path).unwrap();
2409            }
2410        }
2411        fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
2412    }
2413
2414    let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
2415    if version_path.is_file() {
2416        fs::remove_file(version_path).unwrap();
2417    }
2418
2419    let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2420    if state_complete_path.is_file() {
2421        fs::remove_file(state_complete_path).unwrap();
2422    }
2423
2424    assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
2425
2426    // In the unarchiving case, there is an extra empty "accounts" directory. The account
2427    // files in the archive accounts/ have been expanded to [account_paths].
2428    // Remove the empty "accounts" directory for the directory comparison below.
2429    // In some test cases the directory to compare do not come from unarchiving.
2430    // Ignore the error when this directory does not exist.
2431    _ = fs::remove_dir(unpack_account_dir.join("accounts"));
2432    // Check the account entries are the same
2433    assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
2434}
2435
2436/// Purges all bank snapshots
2437pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
2438    let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2439    purge_bank_snapshots(&bank_snapshots);
2440}
2441
2442/// Purges bank snapshots, retaining the newest `num_bank_snapshots_to_retain`
2443pub fn purge_old_bank_snapshots(
2444    bank_snapshots_dir: impl AsRef<Path>,
2445    num_bank_snapshots_to_retain: usize,
2446    filter_by_kind: Option<BankSnapshotKind>,
2447) {
2448    let mut bank_snapshots = match filter_by_kind {
2449        Some(BankSnapshotKind::Pre) => get_bank_snapshots_pre(&bank_snapshots_dir),
2450        Some(BankSnapshotKind::Post) => get_bank_snapshots_post(&bank_snapshots_dir),
2451        None => get_bank_snapshots(&bank_snapshots_dir),
2452    };
2453
2454    bank_snapshots.sort_unstable();
2455    purge_bank_snapshots(
2456        bank_snapshots
2457            .iter()
2458            .rev()
2459            .skip(num_bank_snapshots_to_retain),
2460    );
2461}
2462
2463/// At startup, purge old (i.e. unusable) bank snapshots
2464///
2465/// Only a single bank snapshot could be needed at startup (when using fast boot), so
2466/// retain the highest bank snapshot "post", and purge the rest.
2467pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
2468    purge_old_bank_snapshots(&bank_snapshots_dir, 0, Some(BankSnapshotKind::Pre));
2469    purge_old_bank_snapshots(&bank_snapshots_dir, 1, Some(BankSnapshotKind::Post));
2470
2471    let highest_bank_snapshot_post = get_highest_bank_snapshot_post(&bank_snapshots_dir);
2472    if let Some(highest_bank_snapshot_post) = highest_bank_snapshot_post {
2473        debug!(
2474            "Retained bank snapshot for slot {}, and purged the rest.",
2475            highest_bank_snapshot_post.slot
2476        );
2477    }
2478}
2479
2480/// Purges bank snapshots that are older than `slot`
2481pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
2482    let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2483    bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
2484    purge_bank_snapshots(&bank_snapshots);
2485}
2486
2487/// Purges all `bank_snapshots`
2488///
2489/// Does not exit early if there is an error while purging a bank snapshot.
2490fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
2491    for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
2492        if purge_bank_snapshot(snapshot_dir).is_err() {
2493            warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
2494        }
2495    }
2496}
2497
2498/// Remove the bank snapshot at this path
2499pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
2500    const FN_ERR: &str = "failed to purge bank snapshot";
2501    let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2502    if accounts_hardlinks_dir.is_dir() {
2503        // This directory contain symlinks to all accounts snapshot directories.
2504        // They should all be removed.
2505        let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
2506            IoError::other(format!(
2507                "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
2508                accounts_hardlinks_dir.display(),
2509            ))
2510        })?;
2511        for entry in read_dir {
2512            let accounts_hardlink_dir = entry?.path();
2513            let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
2514                IoError::other(format!(
2515                    "{FN_ERR}: failed to read symlink '{}': {err}",
2516                    accounts_hardlink_dir.display(),
2517                ))
2518            })?;
2519            move_and_async_delete_path(&accounts_hardlink_dir);
2520        }
2521    }
2522    fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
2523        IoError::other(format!(
2524            "{FN_ERR}: failed to remove dir '{}': {err}",
2525            bank_snapshot_dir.as_ref().display(),
2526        ))
2527    })?;
2528    Ok(())
2529}
2530
2531pub fn should_take_full_snapshot(
2532    block_height: Slot,
2533    full_snapshot_archive_interval_slots: Slot,
2534) -> bool {
2535    block_height % full_snapshot_archive_interval_slots == 0
2536}
2537
2538pub fn should_take_incremental_snapshot(
2539    block_height: Slot,
2540    incremental_snapshot_archive_interval_slots: Slot,
2541    latest_full_snapshot_slot: Option<Slot>,
2542) -> bool {
2543    block_height % incremental_snapshot_archive_interval_slots == 0
2544        && latest_full_snapshot_slot.is_some()
2545}
2546
2547/// Creates an "accounts path" directory for tests
2548///
2549/// This temporary directory will contain the "run" and "snapshot"
2550/// sub-directories required by a validator.
2551#[cfg(feature = "dev-context-only-utils")]
2552pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
2553    let tmp_dir = tempfile::TempDir::new().unwrap();
2554    let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
2555    (tmp_dir, account_dir)
2556}
2557
2558#[cfg(test)]
2559mod tests {
2560    use {
2561        super::*,
2562        assert_matches::assert_matches,
2563        bincode::{deserialize_from, serialize_into},
2564        std::{convert::TryFrom, mem::size_of},
2565        tempfile::NamedTempFile,
2566    };
2567
2568    #[test]
2569    fn test_serialize_snapshot_data_file_under_limit() {
2570        let temp_dir = tempfile::TempDir::new().unwrap();
2571        let expected_consumed_size = size_of::<u32>() as u64;
2572        let consumed_size = serialize_snapshot_data_file_capped(
2573            &temp_dir.path().join("data-file"),
2574            expected_consumed_size,
2575            |stream| {
2576                serialize_into(stream, &2323_u32)?;
2577                Ok(())
2578            },
2579        )
2580        .unwrap();
2581        assert_eq!(consumed_size, expected_consumed_size);
2582    }
2583
2584    #[test]
2585    fn test_serialize_snapshot_data_file_over_limit() {
2586        let temp_dir = tempfile::TempDir::new().unwrap();
2587        let expected_consumed_size = size_of::<u32>() as u64;
2588        let result = serialize_snapshot_data_file_capped(
2589            &temp_dir.path().join("data-file"),
2590            expected_consumed_size - 1,
2591            |stream| {
2592                serialize_into(stream, &2323_u32)?;
2593                Ok(())
2594            },
2595        );
2596        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
2597    }
2598
2599    #[test]
2600    fn test_deserialize_snapshot_data_file_under_limit() {
2601        let expected_data = 2323_u32;
2602        let expected_consumed_size = size_of::<u32>() as u64;
2603
2604        let temp_dir = tempfile::TempDir::new().unwrap();
2605        serialize_snapshot_data_file_capped(
2606            &temp_dir.path().join("data-file"),
2607            expected_consumed_size,
2608            |stream| {
2609                serialize_into(stream, &expected_data)?;
2610                Ok(())
2611            },
2612        )
2613        .unwrap();
2614
2615        let snapshot_root_paths = SnapshotRootPaths {
2616            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2617            incremental_snapshot_root_file_path: None,
2618        };
2619
2620        let actual_data = deserialize_snapshot_data_files_capped(
2621            &snapshot_root_paths,
2622            expected_consumed_size,
2623            |stream| {
2624                Ok(deserialize_from::<_, u32>(
2625                    &mut stream.full_snapshot_stream,
2626                )?)
2627            },
2628        )
2629        .unwrap();
2630        assert_eq!(actual_data, expected_data);
2631    }
2632
2633    #[test]
2634    fn test_deserialize_snapshot_data_file_over_limit() {
2635        let expected_data = 2323_u32;
2636        let expected_consumed_size = size_of::<u32>() as u64;
2637
2638        let temp_dir = tempfile::TempDir::new().unwrap();
2639        serialize_snapshot_data_file_capped(
2640            &temp_dir.path().join("data-file"),
2641            expected_consumed_size,
2642            |stream| {
2643                serialize_into(stream, &expected_data)?;
2644                Ok(())
2645            },
2646        )
2647        .unwrap();
2648
2649        let snapshot_root_paths = SnapshotRootPaths {
2650            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2651            incremental_snapshot_root_file_path: None,
2652        };
2653
2654        let result = deserialize_snapshot_data_files_capped(
2655            &snapshot_root_paths,
2656            expected_consumed_size - 1,
2657            |stream| {
2658                Ok(deserialize_from::<_, u32>(
2659                    &mut stream.full_snapshot_stream,
2660                )?)
2661            },
2662        );
2663        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
2664    }
2665
2666    #[test]
2667    fn test_deserialize_snapshot_data_file_extra_data() {
2668        let expected_data = 2323_u32;
2669        let expected_consumed_size = size_of::<u32>() as u64;
2670
2671        let temp_dir = tempfile::TempDir::new().unwrap();
2672        serialize_snapshot_data_file_capped(
2673            &temp_dir.path().join("data-file"),
2674            expected_consumed_size * 2,
2675            |stream| {
2676                serialize_into(stream.by_ref(), &expected_data)?;
2677                serialize_into(stream.by_ref(), &expected_data)?;
2678                Ok(())
2679            },
2680        )
2681        .unwrap();
2682
2683        let snapshot_root_paths = SnapshotRootPaths {
2684            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2685            incremental_snapshot_root_file_path: None,
2686        };
2687
2688        let result = deserialize_snapshot_data_files_capped(
2689            &snapshot_root_paths,
2690            expected_consumed_size * 2,
2691            |stream| {
2692                Ok(deserialize_from::<_, u32>(
2693                    &mut stream.full_snapshot_stream,
2694                )?)
2695            },
2696        );
2697        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2698    }
2699
2700    #[test]
2701    fn test_snapshot_version_from_file_under_limit() {
2702        let file_content = SnapshotVersion::default().as_str();
2703        let mut file = NamedTempFile::new().unwrap();
2704        file.write_all(file_content.as_bytes()).unwrap();
2705        let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2706        assert_eq!(version_from_file, file_content);
2707    }
2708
2709    #[test]
2710    fn test_snapshot_version_from_file_over_limit() {
2711        let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2712        let file_content = vec![7u8; over_limit_size];
2713        let mut file = NamedTempFile::new().unwrap();
2714        file.write_all(&file_content).unwrap();
2715        assert_matches!(
2716            snapshot_version_from_file(file.path()),
2717            Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("snapshot version file too large")
2718        );
2719    }
2720
2721    #[test]
2722    fn test_parse_full_snapshot_archive_filename() {
2723        assert_eq!(
2724            parse_full_snapshot_archive_filename(&format!(
2725                "snapshot-42-{}.tar.bz2",
2726                Hash::default()
2727            ))
2728            .unwrap(),
2729            (42, SnapshotHash(Hash::default()), ArchiveFormat::TarBzip2)
2730        );
2731        assert_eq!(
2732            parse_full_snapshot_archive_filename(&format!(
2733                "snapshot-43-{}.tar.zst",
2734                Hash::default()
2735            ))
2736            .unwrap(),
2737            (43, SnapshotHash(Hash::default()), ArchiveFormat::TarZstd)
2738        );
2739        assert_eq!(
2740            parse_full_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default()))
2741                .unwrap(),
2742            (44, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2743        );
2744        assert_eq!(
2745            parse_full_snapshot_archive_filename(&format!(
2746                "snapshot-45-{}.tar.lz4",
2747                Hash::default()
2748            ))
2749            .unwrap(),
2750            (45, SnapshotHash(Hash::default()), ArchiveFormat::TarLz4)
2751        );
2752
2753        assert!(parse_full_snapshot_archive_filename("invalid").is_err());
2754        assert!(
2755            parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err()
2756        );
2757
2758        assert!(
2759            parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err()
2760        );
2761        assert!(parse_full_snapshot_archive_filename(&format!(
2762            "snapshot-12345678-{}.bad!ext",
2763            Hash::new_unique()
2764        ))
2765        .is_err());
2766        assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2767
2768        assert!(parse_full_snapshot_archive_filename(&format!(
2769            "snapshot-bad!slot-{}.bad!ext",
2770            Hash::new_unique()
2771        ))
2772        .is_err());
2773        assert!(parse_full_snapshot_archive_filename(&format!(
2774            "snapshot-12345678-{}.bad!ext",
2775            Hash::new_unique()
2776        ))
2777        .is_err());
2778        assert!(parse_full_snapshot_archive_filename(&format!(
2779            "snapshot-bad!slot-{}.tar",
2780            Hash::new_unique()
2781        ))
2782        .is_err());
2783
2784        assert!(parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_err());
2785        assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2786        assert!(parse_full_snapshot_archive_filename(&format!(
2787            "snapshot-bad!slot-{}.tar",
2788            Hash::new_unique()
2789        ))
2790        .is_err());
2791    }
2792
2793    #[test]
2794    fn test_parse_incremental_snapshot_archive_filename() {
2795        assert_eq!(
2796            parse_incremental_snapshot_archive_filename(&format!(
2797                "incremental-snapshot-42-123-{}.tar.bz2",
2798                Hash::default()
2799            ))
2800            .unwrap(),
2801            (
2802                42,
2803                123,
2804                SnapshotHash(Hash::default()),
2805                ArchiveFormat::TarBzip2
2806            )
2807        );
2808        assert_eq!(
2809            parse_incremental_snapshot_archive_filename(&format!(
2810                "incremental-snapshot-43-234-{}.tar.zst",
2811                Hash::default()
2812            ))
2813            .unwrap(),
2814            (
2815                43,
2816                234,
2817                SnapshotHash(Hash::default()),
2818                ArchiveFormat::TarZstd
2819            )
2820        );
2821        assert_eq!(
2822            parse_incremental_snapshot_archive_filename(&format!(
2823                "incremental-snapshot-44-345-{}.tar",
2824                Hash::default()
2825            ))
2826            .unwrap(),
2827            (44, 345, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2828        );
2829        assert_eq!(
2830            parse_incremental_snapshot_archive_filename(&format!(
2831                "incremental-snapshot-45-456-{}.tar.lz4",
2832                Hash::default()
2833            ))
2834            .unwrap(),
2835            (
2836                45,
2837                456,
2838                SnapshotHash(Hash::default()),
2839                ArchiveFormat::TarLz4
2840            )
2841        );
2842
2843        assert!(parse_incremental_snapshot_archive_filename("invalid").is_err());
2844        assert!(parse_incremental_snapshot_archive_filename(&format!(
2845            "snapshot-42-{}.tar",
2846            Hash::new_unique()
2847        ))
2848        .is_err());
2849        assert!(parse_incremental_snapshot_archive_filename(
2850            "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext"
2851        )
2852        .is_err());
2853
2854        assert!(parse_incremental_snapshot_archive_filename(&format!(
2855            "incremental-snapshot-bad!slot-56785678-{}.tar",
2856            Hash::new_unique()
2857        ))
2858        .is_err());
2859
2860        assert!(parse_incremental_snapshot_archive_filename(&format!(
2861            "incremental-snapshot-12345678-bad!slot-{}.tar",
2862            Hash::new_unique()
2863        ))
2864        .is_err());
2865
2866        assert!(parse_incremental_snapshot_archive_filename(
2867            "incremental-snapshot-12341234-56785678-bad!HASH.tar"
2868        )
2869        .is_err());
2870
2871        assert!(parse_incremental_snapshot_archive_filename(&format!(
2872            "incremental-snapshot-12341234-56785678-{}.bad!ext",
2873            Hash::new_unique()
2874        ))
2875        .is_err());
2876    }
2877
2878    #[test]
2879    fn test_check_are_snapshots_compatible() {
2880        let slot1: Slot = 1234;
2881        let slot2: Slot = 5678;
2882        let slot3: Slot = 999_999;
2883
2884        let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
2885            format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()),
2886        ))
2887        .unwrap();
2888
2889        assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
2890
2891        let incremental_snapshot_archive_info =
2892            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2893                "/dir/incremental-snapshot-{}-{}-{}.tar",
2894                slot1,
2895                slot2,
2896                Hash::new_unique()
2897            )))
2898            .unwrap();
2899
2900        assert!(check_are_snapshots_compatible(
2901            &full_snapshot_archive_info,
2902            Some(&incremental_snapshot_archive_info)
2903        )
2904        .is_ok());
2905
2906        let incremental_snapshot_archive_info =
2907            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2908                "/dir/incremental-snapshot-{}-{}-{}.tar",
2909                slot2,
2910                slot3,
2911                Hash::new_unique()
2912            )))
2913            .unwrap();
2914
2915        assert!(check_are_snapshots_compatible(
2916            &full_snapshot_archive_info,
2917            Some(&incremental_snapshot_archive_info)
2918        )
2919        .is_err());
2920    }
2921
2922    /// A test heler function that creates bank snapshot files
2923    fn common_create_bank_snapshot_files(
2924        bank_snapshots_dir: &Path,
2925        min_slot: Slot,
2926        max_slot: Slot,
2927    ) {
2928        for slot in min_slot..max_slot {
2929            let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
2930            fs::create_dir_all(&snapshot_dir).unwrap();
2931
2932            let snapshot_filename = get_snapshot_file_name(slot);
2933            let snapshot_path = snapshot_dir.join(snapshot_filename);
2934            fs::File::create(snapshot_path).unwrap();
2935
2936            let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2937            fs::File::create(status_cache_file).unwrap();
2938
2939            let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
2940            fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
2941
2942            // Mark this directory complete so it can be used.  Check this flag first before selecting for deserialization.
2943            let state_complete_path = snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2944            fs::File::create(state_complete_path).unwrap();
2945        }
2946    }
2947
2948    #[test]
2949    fn test_get_bank_snapshots() {
2950        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2951        let min_slot = 10;
2952        let max_slot = 20;
2953        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2954
2955        let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
2956        assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
2957    }
2958
2959    #[test]
2960    fn test_get_highest_bank_snapshot_post() {
2961        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2962        let min_slot = 99;
2963        let max_slot = 123;
2964        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2965
2966        let highest_bank_snapshot = get_highest_bank_snapshot_post(temp_snapshots_dir.path());
2967        assert!(highest_bank_snapshot.is_some());
2968        assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
2969    }
2970
2971    /// A test helper function that creates full and incremental snapshot archive files.  Creates
2972    /// full snapshot files in the range (`min_full_snapshot_slot`, `max_full_snapshot_slot`], and
2973    /// incremental snapshot files in the range (`min_incremental_snapshot_slot`,
2974    /// `max_incremental_snapshot_slot`].  Additionally, "bad" files are created for both full and
2975    /// incremental snapshots to ensure the tests properly filter them out.
2976    fn common_create_snapshot_archive_files(
2977        full_snapshot_archives_dir: &Path,
2978        incremental_snapshot_archives_dir: &Path,
2979        min_full_snapshot_slot: Slot,
2980        max_full_snapshot_slot: Slot,
2981        min_incremental_snapshot_slot: Slot,
2982        max_incremental_snapshot_slot: Slot,
2983    ) {
2984        fs::create_dir_all(full_snapshot_archives_dir).unwrap();
2985        fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
2986        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
2987            for incremental_snapshot_slot in
2988                min_incremental_snapshot_slot..max_incremental_snapshot_slot
2989            {
2990                let snapshot_filename = format!(
2991                    "incremental-snapshot-{}-{}-{}.tar",
2992                    full_snapshot_slot,
2993                    incremental_snapshot_slot,
2994                    Hash::default()
2995                );
2996                let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
2997                fs::File::create(snapshot_filepath).unwrap();
2998            }
2999
3000            let snapshot_filename =
3001                format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3002            let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
3003            fs::File::create(snapshot_filepath).unwrap();
3004
3005            // Add in an incremental snapshot with a bad filename and high slot to ensure filename are filtered and sorted correctly
3006            let bad_filename = format!(
3007                "incremental-snapshot-{}-{}-bad!hash.tar",
3008                full_snapshot_slot,
3009                max_incremental_snapshot_slot + 1,
3010            );
3011            let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
3012            fs::File::create(bad_filepath).unwrap();
3013        }
3014
3015        // Add in a snapshot with a bad filename and high slot to ensure filename are filtered and
3016        // sorted correctly
3017        let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1);
3018        let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
3019        fs::File::create(bad_filepath).unwrap();
3020    }
3021
3022    #[test]
3023    fn test_get_full_snapshot_archives() {
3024        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3025        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3026        let min_slot = 123;
3027        let max_slot = 456;
3028        common_create_snapshot_archive_files(
3029            full_snapshot_archives_dir.path(),
3030            incremental_snapshot_archives_dir.path(),
3031            min_slot,
3032            max_slot,
3033            0,
3034            0,
3035        );
3036
3037        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3038        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3039    }
3040
3041    #[test]
3042    fn test_get_full_snapshot_archives_remote() {
3043        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3044        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3045        let min_slot = 123;
3046        let max_slot = 456;
3047        common_create_snapshot_archive_files(
3048            &full_snapshot_archives_dir
3049                .path()
3050                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3051            &incremental_snapshot_archives_dir
3052                .path()
3053                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3054            min_slot,
3055            max_slot,
3056            0,
3057            0,
3058        );
3059
3060        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3061        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3062        assert!(snapshot_archives.iter().all(|info| info.is_remote()));
3063    }
3064
3065    #[test]
3066    fn test_get_incremental_snapshot_archives() {
3067        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3068        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3069        let min_full_snapshot_slot = 12;
3070        let max_full_snapshot_slot = 23;
3071        let min_incremental_snapshot_slot = 34;
3072        let max_incremental_snapshot_slot = 45;
3073        common_create_snapshot_archive_files(
3074            full_snapshot_archives_dir.path(),
3075            incremental_snapshot_archives_dir.path(),
3076            min_full_snapshot_slot,
3077            max_full_snapshot_slot,
3078            min_incremental_snapshot_slot,
3079            max_incremental_snapshot_slot,
3080        );
3081
3082        let incremental_snapshot_archives =
3083            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3084        assert_eq!(
3085            incremental_snapshot_archives.len() as Slot,
3086            (max_full_snapshot_slot - min_full_snapshot_slot)
3087                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3088        );
3089    }
3090
3091    #[test]
3092    fn test_get_incremental_snapshot_archives_remote() {
3093        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3094        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3095        let min_full_snapshot_slot = 12;
3096        let max_full_snapshot_slot = 23;
3097        let min_incremental_snapshot_slot = 34;
3098        let max_incremental_snapshot_slot = 45;
3099        common_create_snapshot_archive_files(
3100            &full_snapshot_archives_dir
3101                .path()
3102                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3103            &incremental_snapshot_archives_dir
3104                .path()
3105                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3106            min_full_snapshot_slot,
3107            max_full_snapshot_slot,
3108            min_incremental_snapshot_slot,
3109            max_incremental_snapshot_slot,
3110        );
3111
3112        let incremental_snapshot_archives =
3113            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3114        assert_eq!(
3115            incremental_snapshot_archives.len() as Slot,
3116            (max_full_snapshot_slot - min_full_snapshot_slot)
3117                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3118        );
3119        assert!(incremental_snapshot_archives
3120            .iter()
3121            .all(|info| info.is_remote()));
3122    }
3123
3124    #[test]
3125    fn test_get_highest_full_snapshot_archive_slot() {
3126        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3127        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3128        let min_slot = 123;
3129        let max_slot = 456;
3130        common_create_snapshot_archive_files(
3131            full_snapshot_archives_dir.path(),
3132            incremental_snapshot_archives_dir.path(),
3133            min_slot,
3134            max_slot,
3135            0,
3136            0,
3137        );
3138
3139        assert_eq!(
3140            get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
3141            Some(max_slot - 1)
3142        );
3143    }
3144
3145    #[test]
3146    fn test_get_highest_incremental_snapshot_slot() {
3147        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3148        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3149        let min_full_snapshot_slot = 12;
3150        let max_full_snapshot_slot = 23;
3151        let min_incremental_snapshot_slot = 34;
3152        let max_incremental_snapshot_slot = 45;
3153        common_create_snapshot_archive_files(
3154            full_snapshot_archives_dir.path(),
3155            incremental_snapshot_archives_dir.path(),
3156            min_full_snapshot_slot,
3157            max_full_snapshot_slot,
3158            min_incremental_snapshot_slot,
3159            max_incremental_snapshot_slot,
3160        );
3161
3162        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3163            assert_eq!(
3164                get_highest_incremental_snapshot_archive_slot(
3165                    incremental_snapshot_archives_dir.path(),
3166                    full_snapshot_slot
3167                ),
3168                Some(max_incremental_snapshot_slot - 1)
3169            );
3170        }
3171
3172        assert_eq!(
3173            get_highest_incremental_snapshot_archive_slot(
3174                incremental_snapshot_archives_dir.path(),
3175                max_full_snapshot_slot
3176            ),
3177            None
3178        );
3179    }
3180
3181    fn common_test_purge_old_snapshot_archives(
3182        snapshot_names: &[&String],
3183        maximum_full_snapshot_archives_to_retain: NonZeroUsize,
3184        maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
3185        expected_snapshots: &[&String],
3186    ) {
3187        let temp_snap_dir = tempfile::TempDir::new().unwrap();
3188
3189        for snap_name in snapshot_names {
3190            let snap_path = temp_snap_dir.path().join(snap_name);
3191            let mut _snap_file = fs::File::create(snap_path);
3192        }
3193        purge_old_snapshot_archives(
3194            temp_snap_dir.path(),
3195            temp_snap_dir.path(),
3196            maximum_full_snapshot_archives_to_retain,
3197            maximum_incremental_snapshot_archives_to_retain,
3198        );
3199
3200        let mut retained_snaps = HashSet::new();
3201        for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
3202            let entry_path_buf = entry.unwrap().path();
3203            let entry_path = entry_path_buf.as_path();
3204            let snapshot_name = entry_path
3205                .file_name()
3206                .unwrap()
3207                .to_str()
3208                .unwrap()
3209                .to_string();
3210            retained_snaps.insert(snapshot_name);
3211        }
3212
3213        for snap_name in expected_snapshots {
3214            assert!(
3215                retained_snaps.contains(snap_name.as_str()),
3216                "{snap_name} not found"
3217            );
3218        }
3219        assert_eq!(retained_snaps.len(), expected_snapshots.len());
3220    }
3221
3222    #[test]
3223    fn test_purge_old_full_snapshot_archives() {
3224        let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
3225        let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
3226        let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
3227        let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
3228
3229        // expecting only the newest to be retained
3230        let expected_snapshots = vec![&snap3_name];
3231        common_test_purge_old_snapshot_archives(
3232            &snapshot_names,
3233            NonZeroUsize::new(1).unwrap(),
3234            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3235            &expected_snapshots,
3236        );
3237
3238        // retaining 2, expecting the 2 newest to be retained
3239        let expected_snapshots = vec![&snap2_name, &snap3_name];
3240        common_test_purge_old_snapshot_archives(
3241            &snapshot_names,
3242            NonZeroUsize::new(2).unwrap(),
3243            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3244            &expected_snapshots,
3245        );
3246
3247        // retaining 3, all three should be retained
3248        let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
3249        common_test_purge_old_snapshot_archives(
3250            &snapshot_names,
3251            NonZeroUsize::new(3).unwrap(),
3252            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3253            &expected_snapshots,
3254        );
3255    }
3256
3257    /// Mimic a running node's behavior w.r.t. purging old snapshot archives.  Take snapshots in a
3258    /// loop, and periodically purge old snapshot archives.  After purging, check to make sure the
3259    /// snapshot archives on disk are correct.
3260    #[test]
3261    fn test_purge_old_full_snapshot_archives_in_the_loop() {
3262        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3263        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3264        let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
3265        let starting_slot: Slot = 42;
3266
3267        for slot in (starting_slot..).take(100) {
3268            let full_snapshot_archive_file_name =
3269                format!("snapshot-{}-{}.tar", slot, Hash::default());
3270            let full_snapshot_archive_path = full_snapshot_archives_dir
3271                .as_ref()
3272                .join(full_snapshot_archive_file_name);
3273            fs::File::create(full_snapshot_archive_path).unwrap();
3274
3275            // don't purge-and-check until enough snapshot archives have been created
3276            if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
3277                continue;
3278            }
3279
3280            // purge infrequently, so there will always be snapshot archives to purge
3281            if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
3282                continue;
3283            }
3284
3285            purge_old_snapshot_archives(
3286                &full_snapshot_archives_dir,
3287                &incremental_snapshot_archives_dir,
3288                maximum_snapshots_to_retain,
3289                NonZeroUsize::new(usize::MAX).unwrap(),
3290            );
3291            let mut full_snapshot_archives =
3292                get_full_snapshot_archives(&full_snapshot_archives_dir);
3293            full_snapshot_archives.sort_unstable();
3294            assert_eq!(
3295                full_snapshot_archives.len(),
3296                maximum_snapshots_to_retain.get()
3297            );
3298            assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
3299            for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
3300                assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
3301            }
3302        }
3303    }
3304
3305    #[test]
3306    fn test_purge_old_incremental_snapshot_archives() {
3307        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3308        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3309        let starting_slot = 100_000;
3310
3311        let maximum_incremental_snapshot_archives_to_retain =
3312            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3313        let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3314
3315        let incremental_snapshot_interval = 100;
3316        let num_incremental_snapshots_per_full_snapshot =
3317            maximum_incremental_snapshot_archives_to_retain.get() * 2;
3318        let full_snapshot_interval =
3319            incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
3320
3321        let mut snapshot_filenames = vec![];
3322        (starting_slot..)
3323            .step_by(full_snapshot_interval)
3324            .take(
3325                maximum_full_snapshot_archives_to_retain
3326                    .checked_mul(NonZeroUsize::new(2).unwrap())
3327                    .unwrap()
3328                    .get(),
3329            )
3330            .for_each(|full_snapshot_slot| {
3331                let snapshot_filename =
3332                    format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3333                let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
3334                fs::File::create(snapshot_path).unwrap();
3335                snapshot_filenames.push(snapshot_filename);
3336
3337                (full_snapshot_slot..)
3338                    .step_by(incremental_snapshot_interval)
3339                    .take(num_incremental_snapshots_per_full_snapshot)
3340                    .skip(1)
3341                    .for_each(|incremental_snapshot_slot| {
3342                        let snapshot_filename = format!(
3343                            "incremental-snapshot-{}-{}-{}.tar",
3344                            full_snapshot_slot,
3345                            incremental_snapshot_slot,
3346                            Hash::default()
3347                        );
3348                        let snapshot_path = incremental_snapshot_archives_dir
3349                            .path()
3350                            .join(&snapshot_filename);
3351                        fs::File::create(snapshot_path).unwrap();
3352                        snapshot_filenames.push(snapshot_filename);
3353                    });
3354            });
3355
3356        purge_old_snapshot_archives(
3357            full_snapshot_archives_dir.path(),
3358            incremental_snapshot_archives_dir.path(),
3359            maximum_full_snapshot_archives_to_retain,
3360            maximum_incremental_snapshot_archives_to_retain,
3361        );
3362
3363        // Ensure correct number of full snapshot archives are purged/retained
3364        let mut remaining_full_snapshot_archives =
3365            get_full_snapshot_archives(full_snapshot_archives_dir.path());
3366        assert_eq!(
3367            remaining_full_snapshot_archives.len(),
3368            maximum_full_snapshot_archives_to_retain.get(),
3369        );
3370        remaining_full_snapshot_archives.sort_unstable();
3371        let latest_full_snapshot_archive_slot =
3372            remaining_full_snapshot_archives.last().unwrap().slot();
3373
3374        // Ensure correct number of incremental snapshot archives are purged/retained
3375        // For each additional full snapshot archive, one additional (the newest)
3376        // incremental snapshot archive is retained. This is accounted for by the
3377        // `+ maximum_full_snapshot_archives_to_retain.saturating_sub(1)`
3378        let mut remaining_incremental_snapshot_archives =
3379            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3380        assert_eq!(
3381            remaining_incremental_snapshot_archives.len(),
3382            maximum_incremental_snapshot_archives_to_retain
3383                .get()
3384                .saturating_add(
3385                    maximum_full_snapshot_archives_to_retain
3386                        .get()
3387                        .saturating_sub(1)
3388                )
3389        );
3390        remaining_incremental_snapshot_archives.sort_unstable();
3391        remaining_incremental_snapshot_archives.reverse();
3392
3393        // Ensure there exists one incremental snapshot all but the latest full snapshot
3394        for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
3395            let incremental_snapshot_archive =
3396                remaining_incremental_snapshot_archives.pop().unwrap();
3397
3398            let expected_base_slot =
3399                latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
3400            assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
3401            let expected_slot = expected_base_slot
3402                + (full_snapshot_interval - incremental_snapshot_interval) as u64;
3403            assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
3404        }
3405
3406        // Ensure all remaining incremental snapshots are only for the latest full snapshot
3407        for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
3408            assert_eq!(
3409                incremental_snapshot_archive.base_slot(),
3410                latest_full_snapshot_archive_slot
3411            );
3412        }
3413
3414        // Ensure the remaining incremental snapshots are at the right slot
3415        let expected_remaining_incremental_snapshot_archive_slots =
3416            (latest_full_snapshot_archive_slot..)
3417                .step_by(incremental_snapshot_interval)
3418                .take(num_incremental_snapshots_per_full_snapshot)
3419                .skip(
3420                    num_incremental_snapshots_per_full_snapshot
3421                        - maximum_incremental_snapshot_archives_to_retain.get(),
3422                )
3423                .collect::<HashSet<_>>();
3424
3425        let actual_remaining_incremental_snapshot_archive_slots =
3426            remaining_incremental_snapshot_archives
3427                .iter()
3428                .map(|snapshot| snapshot.slot())
3429                .collect::<HashSet<_>>();
3430        assert_eq!(
3431            actual_remaining_incremental_snapshot_archive_slots,
3432            expected_remaining_incremental_snapshot_archive_slots
3433        );
3434    }
3435
3436    #[test]
3437    fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
3438        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3439        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3440
3441        for snapshot_filenames in [
3442            format!("incremental-snapshot-100-120-{}.tar", Hash::default()),
3443            format!("incremental-snapshot-100-140-{}.tar", Hash::default()),
3444            format!("incremental-snapshot-100-160-{}.tar", Hash::default()),
3445            format!("incremental-snapshot-100-180-{}.tar", Hash::default()),
3446            format!("incremental-snapshot-200-220-{}.tar", Hash::default()),
3447            format!("incremental-snapshot-200-240-{}.tar", Hash::default()),
3448            format!("incremental-snapshot-200-260-{}.tar", Hash::default()),
3449            format!("incremental-snapshot-200-280-{}.tar", Hash::default()),
3450        ] {
3451            let snapshot_path = incremental_snapshot_archives_dir
3452                .path()
3453                .join(snapshot_filenames);
3454            fs::File::create(snapshot_path).unwrap();
3455        }
3456
3457        purge_old_snapshot_archives(
3458            full_snapshot_archives_dir.path(),
3459            incremental_snapshot_archives_dir.path(),
3460            NonZeroUsize::new(usize::MAX).unwrap(),
3461            NonZeroUsize::new(usize::MAX).unwrap(),
3462        );
3463
3464        let remaining_incremental_snapshot_archives =
3465            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3466        assert!(remaining_incremental_snapshot_archives.is_empty());
3467    }
3468
3469    #[test]
3470    fn test_get_snapshot_accounts_hardlink_dir() {
3471        let slot: Slot = 1;
3472
3473        let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
3474
3475        let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
3476        let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
3477        let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
3478        fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
3479
3480        let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
3481        let appendvec_filename = format!("{slot}.0");
3482        let appendvec_path = accounts_dir.join(appendvec_filename);
3483
3484        let ret = get_snapshot_accounts_hardlink_dir(
3485            &appendvec_path,
3486            slot,
3487            &mut account_paths_set,
3488            &accounts_hardlinks_dir,
3489        );
3490        assert!(ret.is_ok());
3491
3492        let wrong_appendvec_path = appendvec_path
3493            .parent()
3494            .unwrap()
3495            .parent()
3496            .unwrap()
3497            .join(appendvec_path.file_name().unwrap());
3498        let ret = get_snapshot_accounts_hardlink_dir(
3499            &wrong_appendvec_path,
3500            slot,
3501            &mut account_paths_set,
3502            accounts_hardlinks_dir,
3503        );
3504
3505        assert_matches!(
3506            ret,
3507            Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
3508        );
3509    }
3510
3511    #[test]
3512    fn test_full_snapshot_slot_file_good() {
3513        let slot_written = 123_456_789;
3514        let bank_snapshot_dir = TempDir::new().unwrap();
3515        write_full_snapshot_slot_file(&bank_snapshot_dir, slot_written).unwrap();
3516
3517        let slot_read = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap();
3518        assert_eq!(slot_read, slot_written);
3519    }
3520
3521    #[test]
3522    fn test_full_snapshot_slot_file_bad() {
3523        const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
3524        let too_small = [1u8; SLOT_SIZE - 1];
3525        let too_large = [1u8; SLOT_SIZE + 1];
3526
3527        for contents in [too_small.as_slice(), too_large.as_slice()] {
3528            let bank_snapshot_dir = TempDir::new().unwrap();
3529            let full_snapshot_slot_path = bank_snapshot_dir
3530                .as_ref()
3531                .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
3532            fs::write(full_snapshot_slot_path, contents).unwrap();
3533
3534            let err = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap_err();
3535            assert!(err
3536                .to_string()
3537                .starts_with("invalid full snapshot slot file size"));
3538        }
3539    }
3540}