solana_runtime/
snapshot_utils.rs

1use {
2    crate::{
3        bank::{BankFieldsToSerialize, BankHashStats, BankSlotDelta},
4        serde_snapshot::{
5            self, BankIncrementalSnapshotPersistence, ExtraFieldsToSerialize, SnapshotStreams,
6        },
7        snapshot_archive_info::{
8            FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
9            SnapshotArchiveInfoGetter,
10        },
11        snapshot_bank_utils,
12        snapshot_config::SnapshotConfig,
13        snapshot_hash::SnapshotHash,
14        snapshot_package::{SnapshotKind, SnapshotPackage},
15        snapshot_utils::snapshot_storage_rebuilder::{
16            RebuiltSnapshotStorage, SnapshotStorageRebuilder,
17        },
18    },
19    bzip2::bufread::BzDecoder,
20    crossbeam_channel::Sender,
21    flate2::read::GzDecoder,
22    lazy_static::lazy_static,
23    log::*,
24    regex::Regex,
25    solana_accounts_db::{
26        account_storage::{meta::StoredMetaWriteVersion, AccountStorageMap},
27        accounts_db::{AccountStorageEntry, AtomicAccountsFileId},
28        accounts_file::{AccountsFile, AccountsFileError, InternalsForArchive, StorageAccess},
29        accounts_hash::{AccountsDeltaHash, AccountsHash},
30        epoch_accounts_hash::EpochAccountsHash,
31        hardened_unpack::{self, ParallelSelector, UnpackError},
32        shared_buffer_reader::{SharedBuffer, SharedBufferReader},
33        utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
34    },
35    solana_measure::{measure::Measure, measure_time, measure_us},
36    solana_sdk::{
37        clock::{Epoch, Slot},
38        hash::Hash,
39    },
40    std::{
41        cmp::Ordering,
42        collections::{HashMap, HashSet},
43        fmt, fs,
44        io::{BufReader, BufWriter, Error as IoError, Read, Result as IoResult, Seek, Write},
45        mem,
46        num::NonZeroUsize,
47        ops::RangeInclusive,
48        path::{Path, PathBuf},
49        process::ExitStatus,
50        str::FromStr,
51        sync::Arc,
52        thread::{Builder, JoinHandle},
53    },
54    tar::{self, Archive},
55    tempfile::TempDir,
56    thiserror::Error,
57};
58#[cfg(feature = "dev-context-only-utils")]
59use {
60    hardened_unpack::UnpackedAppendVecMap, rayon::prelude::*,
61    solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
62};
63
64mod archive_format;
65pub mod snapshot_storage_rebuilder;
66pub use archive_format::*;
67
68pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
69pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
70pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
71pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
72pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
73pub const SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME: &str = "full_snapshot_slot";
74pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
75const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; // byte
76const VERSION_STRING_V1_2_0: &str = "1.2.0";
77pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
78pub const BANK_SNAPSHOT_PRE_FILENAME_EXTENSION: &str = "pre";
79// The following unsafes are
80// - Safe because the values are fixed, known non-zero constants
81// - Necessary in order to have a plain NonZeroUsize as the constant, NonZeroUsize
82//   returns an Option<NonZeroUsize> and we can't .unwrap() at compile time
83pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
84    unsafe { NonZeroUsize::new_unchecked(2) };
85pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
86    unsafe { NonZeroUsize::new_unchecked(4) };
87pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
88pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
89
90#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
91pub enum SnapshotVersion {
92    #[default]
93    V1_2_0,
94}
95
96impl fmt::Display for SnapshotVersion {
97    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
98        f.write_str(From::from(*self))
99    }
100}
101
102impl From<SnapshotVersion> for &'static str {
103    fn from(snapshot_version: SnapshotVersion) -> &'static str {
104        match snapshot_version {
105            SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
106        }
107    }
108}
109
110impl FromStr for SnapshotVersion {
111    type Err = &'static str;
112
113    fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
114        // Remove leading 'v' or 'V' from slice
115        let version_string = if version_string
116            .get(..1)
117            .is_some_and(|s| s.eq_ignore_ascii_case("v"))
118        {
119            &version_string[1..]
120        } else {
121            version_string
122        };
123        match version_string {
124            VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
125            _ => Err("unsupported snapshot version"),
126        }
127    }
128}
129
130impl SnapshotVersion {
131    pub fn as_str(self) -> &'static str {
132        <&str as From<Self>>::from(self)
133    }
134}
135
136/// Information about a bank snapshot. Namely the slot of the bank, the path to the snapshot, and
137/// the kind of the snapshot.
138#[derive(PartialEq, Eq, Debug)]
139pub struct BankSnapshotInfo {
140    /// Slot of the bank
141    pub slot: Slot,
142    /// Snapshot kind
143    pub snapshot_kind: BankSnapshotKind,
144    /// Path to the bank snapshot directory
145    pub snapshot_dir: PathBuf,
146    /// Snapshot version
147    pub snapshot_version: SnapshotVersion,
148}
149
150impl PartialOrd for BankSnapshotInfo {
151    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
152        Some(self.cmp(other))
153    }
154}
155
156// Order BankSnapshotInfo by slot (ascending), which practically is sorting chronologically
157impl Ord for BankSnapshotInfo {
158    fn cmp(&self, other: &Self) -> Ordering {
159        self.slot.cmp(&other.slot)
160    }
161}
162
163impl BankSnapshotInfo {
164    pub fn new_from_dir(
165        bank_snapshots_dir: impl AsRef<Path>,
166        slot: Slot,
167    ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
168        // check this directory to see if there is a BankSnapshotPre and/or
169        // BankSnapshotPost file
170        let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
171
172        if !bank_snapshot_dir.is_dir() {
173            return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
174                bank_snapshot_dir,
175            ));
176        }
177
178        // Among the files checks, the completion flag file check should be done first to avoid the later
179        // I/O errors.
180
181        // There is a time window from the slot directory being created, and the content being completely
182        // filled.  Check the completion to avoid using a highest found slot directory with missing content.
183        if !is_bank_snapshot_complete(&bank_snapshot_dir) {
184            return Err(SnapshotNewFromDirError::IncompleteDir(bank_snapshot_dir));
185        }
186
187        let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
188        if !status_cache_file.is_file() {
189            return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
190                status_cache_file,
191            ));
192        }
193
194        let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
195        let version_str = snapshot_version_from_file(&version_path).or(Err(
196            SnapshotNewFromDirError::MissingVersionFile(version_path),
197        ))?;
198        let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
199            .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
200
201        let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
202        let bank_snapshot_pre_path =
203            bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
204
205        // NOTE: It is important that checking for "Pre" happens before "Post.
206        //
207        // Consider the scenario where AccountsHashVerifier is actively processing an
208        // AccountsPackage for a snapshot/slot; if AHV is in the middle of reserializing the
209        // bank snapshot file (writing the new "Post" file), and then the process dies,
210        // there will be an incomplete "Post" file on disk.  We do not want only the existence of
211        // this "Post" file to be sufficient for deciding the snapshot kind as "Post".  More so,
212        // "Post" *requires* the *absence* of a "Pre" file.
213        let snapshot_kind = if bank_snapshot_pre_path.is_file() {
214            BankSnapshotKind::Pre
215        } else if bank_snapshot_post_path.is_file() {
216            BankSnapshotKind::Post
217        } else {
218            return Err(SnapshotNewFromDirError::MissingSnapshotFile(
219                bank_snapshot_dir,
220            ));
221        };
222
223        Ok(BankSnapshotInfo {
224            slot,
225            snapshot_kind,
226            snapshot_dir: bank_snapshot_dir,
227            snapshot_version,
228        })
229    }
230
231    pub fn snapshot_path(&self) -> PathBuf {
232        let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
233
234        let ext = match self.snapshot_kind {
235            BankSnapshotKind::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
236            BankSnapshotKind::Post => "",
237        };
238        bank_snapshot_path.set_extension(ext);
239
240        bank_snapshot_path
241    }
242}
243
244/// Bank snapshots traditionally had their accounts hash calculated prior to serialization.  Since
245/// the hash calculation takes a long time, an optimization has been put in to offload the accounts
246/// hash calculation.  The bank serialization format has not changed, so we need another way to
247/// identify if a bank snapshot contains the calculated accounts hash or not.
248///
249/// When a bank snapshot is first taken, it does not have the calculated accounts hash.  It is said
250/// that this bank snapshot is "pre" accounts hash.  Later, when the accounts hash is calculated,
251/// the bank snapshot is re-serialized, and is now "post" accounts hash.
252#[derive(Debug, Copy, Clone, Eq, PartialEq)]
253pub enum BankSnapshotKind {
254    /// This bank snapshot has *not* yet had its accounts hash calculated
255    Pre,
256    /// This bank snapshot *has* had its accounts hash calculated
257    Post,
258}
259
260/// When constructing a bank a snapshot, traditionally the snapshot was from a snapshot archive.  Now,
261/// the snapshot can be from a snapshot directory, or from a snapshot archive.  This is the flag to
262/// indicate which.
263#[derive(Clone, Copy, Debug, Eq, PartialEq)]
264pub enum SnapshotFrom {
265    /// Build from the snapshot archive
266    Archive,
267    /// Build directly from the bank snapshot directory
268    Dir,
269}
270
271/// Helper type when rebuilding from snapshots.  Designed to handle when rebuilding from just a
272/// full snapshot, or from both a full snapshot and an incremental snapshot.
273#[derive(Debug)]
274pub struct SnapshotRootPaths {
275    pub full_snapshot_root_file_path: PathBuf,
276    pub incremental_snapshot_root_file_path: Option<PathBuf>,
277}
278
279/// Helper type to bundle up the results from `unarchive_snapshot()`
280#[derive(Debug)]
281pub struct UnarchivedSnapshot {
282    #[allow(dead_code)]
283    unpack_dir: TempDir,
284    pub storage: AccountStorageMap,
285    pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
286    pub measure_untar: Measure,
287}
288
289/// Helper type for passing around the unpacked snapshots dir and the snapshot version together
290#[derive(Debug)]
291pub struct UnpackedSnapshotsDirAndVersion {
292    pub unpacked_snapshots_dir: PathBuf,
293    pub snapshot_version: SnapshotVersion,
294}
295
296/// Helper type for passing around account storage map and next append vec id
297/// for reconstructing accounts from a snapshot
298pub(crate) struct StorageAndNextAccountsFileId {
299    pub storage: AccountStorageMap,
300    pub next_append_vec_id: AtomicAccountsFileId,
301}
302
303#[derive(Error, Debug)]
304#[allow(clippy::large_enum_variant)]
305pub enum SnapshotError {
306    #[error("I/O error: {0}")]
307    Io(#[from] IoError),
308
309    #[error("AccountsFile error: {0}")]
310    AccountsFileError(#[from] AccountsFileError),
311
312    #[error("serialization error: {0}")]
313    Serialize(#[from] bincode::Error),
314
315    #[error("crossbeam send error: {0}")]
316    CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
317
318    #[error("archive generation failure {0}")]
319    ArchiveGenerationFailure(ExitStatus),
320
321    #[error("Unpack error: {0}")]
322    UnpackError(#[from] UnpackError),
323
324    #[error("source({1}) - I/O error: {0}")]
325    IoWithSource(IoError, &'static str),
326
327    #[error("could not get file name from path '{0}'")]
328    PathToFileNameError(PathBuf),
329
330    #[error("could not get str from file name '{0}'")]
331    FileNameToStrError(PathBuf),
332
333    #[error("could not parse snapshot archive's file name '{0}'")]
334    ParseSnapshotArchiveFileNameError(String),
335
336    #[error("snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot ({1}) do not match")]
337    MismatchedBaseSlot(Slot, Slot),
338
339    #[error("no snapshot archives to load from '{0}'")]
340    NoSnapshotArchives(PathBuf),
341
342    #[error("snapshot slot mismatch: deserialized bank: {0}, snapshot archive: {1}")]
343    MismatchedSlot(Slot, Slot),
344
345    #[error("snapshot hash mismatch: deserialized bank: {0:?}, snapshot archive: {1:?}")]
346    MismatchedHash(SnapshotHash, SnapshotHash),
347
348    #[error("snapshot slot deltas are invalid: {0}")]
349    VerifySlotDeltas(#[from] VerifySlotDeltasError),
350
351    #[error("snapshot epoch stakes are invalid: {0}")]
352    VerifyEpochStakes(#[from] VerifyEpochStakesError),
353
354    #[error("bank_snapshot_info new_from_dir failed: {0}")]
355    NewFromDir(#[from] SnapshotNewFromDirError),
356
357    #[error("invalid snapshot dir path '{0}'")]
358    InvalidSnapshotDirPath(PathBuf),
359
360    #[error("invalid AppendVec path '{0}'")]
361    InvalidAppendVecPath(PathBuf),
362
363    #[error("invalid account path '{0}'")]
364    InvalidAccountPath(PathBuf),
365
366    #[error("no valid snapshot dir found under '{0}'")]
367    NoSnapshotSlotDir(PathBuf),
368
369    #[error("snapshot dir account paths mismatching")]
370    AccountPathsMismatch,
371
372    #[error("failed to add bank snapshot for slot {1}: {0}")]
373    AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
374
375    #[error("failed to archive snapshot package: {0}")]
376    ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
377
378    #[error("failed to rebuild snapshot storages: {0}")]
379    RebuildStorages(String),
380}
381
382#[derive(Error, Debug)]
383pub enum SnapshotNewFromDirError {
384    #[error("invalid bank snapshot directory '{0}'")]
385    InvalidBankSnapshotDir(PathBuf),
386
387    #[error("missing status cache file '{0}'")]
388    MissingStatusCacheFile(PathBuf),
389
390    #[error("missing version file '{0}'")]
391    MissingVersionFile(PathBuf),
392
393    #[error("invalid snapshot version '{0}'")]
394    InvalidVersion(String),
395
396    #[error("snapshot directory incomplete '{0}'")]
397    IncompleteDir(PathBuf),
398
399    #[error("missing snapshot file '{0}'")]
400    MissingSnapshotFile(PathBuf),
401}
402
403pub type Result<T> = std::result::Result<T, SnapshotError>;
404
405/// Errors that can happen in `verify_slot_deltas()`
406#[derive(Error, Debug, PartialEq, Eq)]
407pub enum VerifySlotDeltasError {
408    #[error("too many entries: {0} (max: {1})")]
409    TooManyEntries(usize, usize),
410
411    #[error("slot {0} is not a root")]
412    SlotIsNotRoot(Slot),
413
414    #[error("slot {0} is greater than bank slot {1}")]
415    SlotGreaterThanMaxRoot(Slot, Slot),
416
417    #[error("slot {0} has multiple entries")]
418    SlotHasMultipleEntries(Slot),
419
420    #[error("slot {0} was not found in slot history")]
421    SlotNotFoundInHistory(Slot),
422
423    #[error("slot {0} was in history but missing from slot deltas")]
424    SlotNotFoundInDeltas(Slot),
425
426    #[error("slot history is bad and cannot be used to verify slot deltas")]
427    BadSlotHistory,
428}
429
430/// Errors that can happen in `verify_epoch_stakes()`
431#[derive(Error, Debug, PartialEq, Eq)]
432pub enum VerifyEpochStakesError {
433    #[error("epoch {0} is greater than the max {1}")]
434    EpochGreaterThanMax(Epoch, Epoch),
435
436    #[error("stakes not found for epoch {0} (required epochs: {1:?})")]
437    StakesNotFound(Epoch, RangeInclusive<Epoch>),
438}
439
440/// Errors that can happen in `add_bank_snapshot()`
441#[derive(Error, Debug)]
442pub enum AddBankSnapshotError {
443    #[error("bank snapshot dir already exists '{0}'")]
444    SnapshotDirAlreadyExists(PathBuf),
445
446    #[error("failed to create snapshot dir '{1}': {0}")]
447    CreateSnapshotDir(#[source] IoError, PathBuf),
448
449    #[error("failed to flush storage '{1}': {0}")]
450    FlushStorage(#[source] AccountsFileError, PathBuf),
451
452    #[error("failed to hard link storages: {0}")]
453    HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
454
455    #[error("failed to serialize bank: {0}")]
456    SerializeBank(#[source] Box<SnapshotError>),
457
458    #[error("failed to serialize status cache: {0}")]
459    SerializeStatusCache(#[source] Box<SnapshotError>),
460
461    #[error("failed to write snapshot version file '{1}': {0}")]
462    WriteSnapshotVersionFile(#[source] IoError, PathBuf),
463
464    #[error("failed to mark snapshot as 'complete': failed to create file '{1}': {0}")]
465    CreateStateCompleteFile(#[source] IoError, PathBuf),
466}
467
468/// Errors that can happen in `archive_snapshot_package()`
469#[derive(Error, Debug)]
470pub enum ArchiveSnapshotPackageError {
471    #[error("failed to create archive path '{1}': {0}")]
472    CreateArchiveDir(#[source] IoError, PathBuf),
473
474    #[error("failed to create staging dir inside '{1}': {0}")]
475    CreateStagingDir(#[source] IoError, PathBuf),
476
477    #[error("failed to create snapshot staging dir '{1}': {0}")]
478    CreateSnapshotStagingDir(#[source] IoError, PathBuf),
479
480    #[error("failed to canonicalize snapshot source dir '{1}': {0}")]
481    CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
482
483    #[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
484    SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
485
486    #[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
487    SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
488
489    #[error("failed to symlink version file from '{1}' to '{2}': {0}")]
490    SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
491
492    #[error("failed to create archive file '{1}': {0}")]
493    CreateArchiveFile(#[source] IoError, PathBuf),
494
495    #[error("failed to archive version file: {0}")]
496    ArchiveVersionFile(#[source] IoError),
497
498    #[error("failed to archive snapshots dir: {0}")]
499    ArchiveSnapshotsDir(#[source] IoError),
500
501    #[error("failed to archive account storage file '{1}': {0}")]
502    ArchiveAccountStorageFile(#[source] IoError, PathBuf),
503
504    #[error("failed to archive snapshot: {0}")]
505    FinishArchive(#[source] IoError),
506
507    #[error("failed to create encoder: {0}")]
508    CreateEncoder(#[source] IoError),
509
510    #[error("failed to encode archive: {0}")]
511    FinishEncoder(#[source] IoError),
512
513    #[error("failed to query archive metadata '{1}': {0}")]
514    QueryArchiveMetadata(#[source] IoError, PathBuf),
515
516    #[error("failed to move archive from '{1}' to '{2}': {0}")]
517    MoveArchive(#[source] IoError, PathBuf, PathBuf),
518}
519
520/// Errors that can happen in `hard_link_storages_to_snapshot()`
521#[derive(Error, Debug)]
522pub enum HardLinkStoragesToSnapshotError {
523    #[error("failed to create accounts hard links dir '{1}': {0}")]
524    CreateAccountsHardLinksDir(#[source] IoError, PathBuf),
525
526    #[error("failed to get the snapshot's accounts hard link dir: {0}")]
527    GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
528
529    #[error("failed to hard link storage from '{1}' to '{2}': {0}")]
530    HardLinkStorage(#[source] IoError, PathBuf, PathBuf),
531}
532
533/// Errors that can happen in `get_snapshot_accounts_hardlink_dir()`
534#[derive(Error, Debug)]
535pub enum GetSnapshotAccountsHardLinkDirError {
536    #[error("invalid account storage path '{0}'")]
537    GetAccountPath(PathBuf),
538
539    #[error("failed to create the snapshot hard link dir '{1}': {0}")]
540    CreateSnapshotHardLinkDir(#[source] IoError, PathBuf),
541
542    #[error("failed to symlink snapshot hard link dir '{link}' to '{original}': {source}")]
543    SymlinkSnapshotHardLinkDir {
544        source: IoError,
545        original: PathBuf,
546        link: PathBuf,
547    },
548}
549
550/// The account snapshot directories under <account_path>/snapshot/<slot> contain account files hardlinked
551/// from <account_path>/run taken at snapshot <slot> time.  They are referenced by the symlinks from the
552/// bank snapshot dir snapshot/<slot>/accounts_hardlinks/.  We observed that sometimes the bank snapshot dir
553/// could be deleted but the account snapshot directories were left behind, possibly by some manual operations
554/// or some legacy code not using the symlinks to clean up the account snapshot hardlink directories.
555/// This function cleans up any account snapshot directories that are no longer referenced by the bank
556/// snapshot dirs, to ensure proper snapshot operations.
557pub fn clean_orphaned_account_snapshot_dirs(
558    bank_snapshots_dir: impl AsRef<Path>,
559    account_snapshot_paths: &[PathBuf],
560) -> IoResult<()> {
561    // Create the HashSet of the account snapshot hardlink directories referenced by the snapshot dirs.
562    // This is used to clean up any hardlinks that are no longer referenced by the snapshot dirs.
563    let mut account_snapshot_dirs_referenced = HashSet::new();
564    let snapshots = get_bank_snapshots(bank_snapshots_dir);
565    for snapshot in snapshots {
566        let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
567        // loop through entries in the snapshot_hardlink_dir, read the symlinks, add the target to the HashSet
568        let read_dir = fs::read_dir(&account_hardlinks_dir).map_err(|err| {
569            IoError::other(format!(
570                "failed to read account hardlinks dir '{}': {err}",
571                account_hardlinks_dir.display(),
572            ))
573        })?;
574        for entry in read_dir {
575            let path = entry?.path();
576            let target = fs::read_link(&path).map_err(|err| {
577                IoError::other(format!(
578                    "failed to read symlink '{}': {err}",
579                    path.display(),
580                ))
581            })?;
582            account_snapshot_dirs_referenced.insert(target);
583        }
584    }
585
586    // loop through the account snapshot hardlink directories, if the directory is not in the account_snapshot_dirs_referenced set, delete it
587    for account_snapshot_path in account_snapshot_paths {
588        let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
589            IoError::other(format!(
590                "failed to read account snapshot dir '{}': {err}",
591                account_snapshot_path.display(),
592            ))
593        })?;
594        for entry in read_dir {
595            let path = entry?.path();
596            if !account_snapshot_dirs_referenced.contains(&path) {
597                info!(
598                    "Removing orphaned account snapshot hardlink directory '{}'...",
599                    path.display()
600                );
601                move_and_async_delete_path(&path);
602            }
603        }
604    }
605
606    Ok(())
607}
608
609/// Purges incomplete bank snapshots
610pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
611    let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
612        // If we cannot read the bank snapshots dir, then there's nothing to do
613        return;
614    };
615
616    let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
617
618    let incomplete_dirs: Vec<_> = read_dir_iter
619        .filter_map(|entry| entry.ok())
620        .map(|entry| entry.path())
621        .filter(|path| path.is_dir())
622        .filter(is_incomplete)
623        .collect();
624
625    // attempt to purge all the incomplete directories; do not exit early
626    for incomplete_dir in incomplete_dirs {
627        let result = purge_bank_snapshot(&incomplete_dir);
628        match result {
629            Ok(_) => info!(
630                "Purged incomplete snapshot dir: {}",
631                incomplete_dir.display()
632            ),
633            Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
634        }
635    }
636}
637
638/// Is the bank snapshot complete?
639fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
640    let state_complete_path = bank_snapshot_dir
641        .as_ref()
642        .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
643    state_complete_path.is_file()
644}
645
646/// Writes the full snapshot slot file into the bank snapshot dir
647pub fn write_full_snapshot_slot_file(
648    bank_snapshot_dir: impl AsRef<Path>,
649    full_snapshot_slot: Slot,
650) -> IoResult<()> {
651    let full_snapshot_slot_path = bank_snapshot_dir
652        .as_ref()
653        .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
654    fs::write(
655        &full_snapshot_slot_path,
656        Slot::to_le_bytes(full_snapshot_slot),
657    )
658    .map_err(|err| {
659        IoError::other(format!(
660            "failed to write full snapshot slot file '{}': {err}",
661            full_snapshot_slot_path.display(),
662        ))
663    })
664}
665
666// Reads the full snapshot slot file from the bank snapshot dir
667pub fn read_full_snapshot_slot_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<Slot> {
668    const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
669    let full_snapshot_slot_path = bank_snapshot_dir
670        .as_ref()
671        .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
672    let full_snapshot_slot_file_metadata = fs::metadata(&full_snapshot_slot_path)?;
673    if full_snapshot_slot_file_metadata.len() != SLOT_SIZE as u64 {
674        let error_message = format!(
675            "invalid full snapshot slot file size: '{}' has {} bytes (should be {} bytes)",
676            full_snapshot_slot_path.display(),
677            full_snapshot_slot_file_metadata.len(),
678            SLOT_SIZE,
679        );
680        return Err(IoError::other(error_message));
681    }
682    let mut full_snapshot_slot_file = fs::File::open(&full_snapshot_slot_path)?;
683    let mut buffer = [0; SLOT_SIZE];
684    full_snapshot_slot_file.read_exact(&mut buffer)?;
685    let slot = Slot::from_le_bytes(buffer);
686    Ok(slot)
687}
688
689/// Gets the highest, loadable, bank snapshot
690///
691/// The highest bank snapshot is the one with the highest slot.
692/// To be loadable, the bank snapshot must be a BankSnapshotKind::Post.
693/// And if we're generating snapshots (e.g. running a normal validator), then
694/// the full snapshot file's slot must match the highest full snapshot archive's.
695pub fn get_highest_loadable_bank_snapshot(
696    snapshot_config: &SnapshotConfig,
697) -> Option<BankSnapshotInfo> {
698    let highest_bank_snapshot =
699        get_highest_bank_snapshot_post(&snapshot_config.bank_snapshots_dir)?;
700
701    // If we're *not* generating snapshots, e.g. running ledger-tool, then we *can* load
702    // this bank snapshot, and we do not need to check for anything else.
703    if !snapshot_config.should_generate_snapshots() {
704        return Some(highest_bank_snapshot);
705    }
706
707    // Otherwise, the bank snapshot's full snapshot slot *must* be the same as
708    // the highest full snapshot archive's slot.
709    let highest_full_snapshot_archive_slot =
710        get_highest_full_snapshot_archive_slot(&snapshot_config.full_snapshot_archives_dir)?;
711    let full_snapshot_file_slot =
712        read_full_snapshot_slot_file(&highest_bank_snapshot.snapshot_dir).ok()?;
713    (full_snapshot_file_slot == highest_full_snapshot_archive_slot).then_some(highest_bank_snapshot)
714}
715
716/// If the validator halts in the middle of `archive_snapshot_package()`, the temporary staging
717/// directory won't be cleaned up.  Call this function to clean them up.
718pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
719    if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
720        for entry in entries.flatten() {
721            if entry
722                .file_name()
723                .to_str()
724                .map(|file_name| file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX))
725                .unwrap_or(false)
726            {
727                let path = entry.path();
728                let result = if path.is_dir() {
729                    fs::remove_dir_all(&path)
730                } else {
731                    fs::remove_file(&path)
732                };
733                if let Err(err) = result {
734                    warn!(
735                        "Failed to remove temporary snapshot archive '{}': {err}",
736                        path.display(),
737                    );
738                }
739            }
740        }
741    }
742}
743
744/// Serializes and archives a snapshot package
745pub fn serialize_and_archive_snapshot_package(
746    snapshot_package: SnapshotPackage,
747    snapshot_config: &SnapshotConfig,
748) -> Result<SnapshotArchiveInfo> {
749    let SnapshotPackage {
750        snapshot_kind,
751        slot: snapshot_slot,
752        block_height,
753        hash: snapshot_hash,
754        mut snapshot_storages,
755        status_cache_slot_deltas,
756        bank_fields_to_serialize,
757        bank_hash_stats,
758        accounts_delta_hash,
759        accounts_hash,
760        epoch_accounts_hash,
761        bank_incremental_snapshot_persistence,
762        write_version,
763        enqueued: _,
764    } = snapshot_package;
765
766    let bank_snapshot_info = serialize_snapshot(
767        &snapshot_config.bank_snapshots_dir,
768        snapshot_config.snapshot_version,
769        snapshot_storages.as_slice(),
770        status_cache_slot_deltas.as_slice(),
771        bank_fields_to_serialize,
772        bank_hash_stats,
773        accounts_delta_hash,
774        accounts_hash,
775        epoch_accounts_hash,
776        bank_incremental_snapshot_persistence.as_ref(),
777        write_version,
778    )?;
779
780    // now write the full snapshot slot file after serializing so this bank snapshot is loadable
781    let full_snapshot_archive_slot = match snapshot_kind {
782        SnapshotKind::FullSnapshot => snapshot_slot,
783        SnapshotKind::IncrementalSnapshot(base_slot) => base_slot,
784    };
785    write_full_snapshot_slot_file(&bank_snapshot_info.snapshot_dir, full_snapshot_archive_slot)
786        .map_err(|err| {
787            IoError::other(format!(
788                "failed to serialize snapshot slot {snapshot_slot}, block height {block_height}, kind {snapshot_kind:?}: {err}",
789            ))
790        })?;
791
792    let snapshot_archive_path = match snapshot_package.snapshot_kind {
793        SnapshotKind::FullSnapshot => build_full_snapshot_archive_path(
794            &snapshot_config.full_snapshot_archives_dir,
795            snapshot_package.slot,
796            &snapshot_package.hash,
797            snapshot_config.archive_format,
798        ),
799        SnapshotKind::IncrementalSnapshot(incremental_snapshot_base_slot) => {
800            // After the snapshot has been serialized, it is now safe (and required) to prune all
801            // the storages that are *not* to be archived for this incremental snapshot.
802            snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
803            build_incremental_snapshot_archive_path(
804                &snapshot_config.incremental_snapshot_archives_dir,
805                incremental_snapshot_base_slot,
806                snapshot_package.slot,
807                &snapshot_package.hash,
808                snapshot_config.archive_format,
809            )
810        }
811    };
812
813    let snapshot_archive_info = archive_snapshot(
814        snapshot_kind,
815        snapshot_slot,
816        snapshot_hash,
817        snapshot_storages.as_slice(),
818        &bank_snapshot_info.snapshot_dir,
819        snapshot_archive_path,
820        snapshot_config.archive_format,
821    )?;
822
823    Ok(snapshot_archive_info)
824}
825
826/// Serializes a snapshot into `bank_snapshots_dir`
827#[allow(clippy::too_many_arguments)]
828fn serialize_snapshot(
829    bank_snapshots_dir: impl AsRef<Path>,
830    snapshot_version: SnapshotVersion,
831    snapshot_storages: &[Arc<AccountStorageEntry>],
832    slot_deltas: &[BankSlotDelta],
833    mut bank_fields: BankFieldsToSerialize,
834    bank_hash_stats: BankHashStats,
835    accounts_delta_hash: AccountsDeltaHash,
836    accounts_hash: AccountsHash,
837    epoch_accounts_hash: Option<EpochAccountsHash>,
838    bank_incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
839    write_version: StoredMetaWriteVersion,
840) -> Result<BankSnapshotInfo> {
841    let slot = bank_fields.slot;
842
843    // this lambda function is to facilitate converting between
844    // the AddBankSnapshotError and SnapshotError types
845    let do_serialize_snapshot = || {
846        let mut measure_everything = Measure::start("");
847        let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
848        if bank_snapshot_dir.exists() {
849            return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
850                bank_snapshot_dir,
851            ));
852        }
853        fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
854            AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
855        })?;
856
857        // the bank snapshot is stored as bank_snapshots_dir/slot/slot
858        let bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
859        info!(
860            "Creating bank snapshot for slot {slot} at '{}'",
861            bank_snapshot_path.display(),
862        );
863
864        let (_, flush_storages_us) = measure_us!({
865            for storage in snapshot_storages {
866                storage.flush().map_err(|err| {
867                    AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
868                })?;
869            }
870        });
871
872        // We are constructing the snapshot directory to contain the full snapshot state information to allow
873        // constructing a bank from this directory.  It acts like an archive to include the full state.
874        // The set of the account storages files is the necessary part of this snapshot state.  Hard-link them
875        // from the operational accounts/ directory to here.
876        let (_, hard_link_storages_us) = measure_us!(hard_link_storages_to_snapshot(
877            &bank_snapshot_dir,
878            slot,
879            snapshot_storages
880        )
881        .map_err(AddBankSnapshotError::HardLinkStorages)?);
882
883        let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
884            let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
885            let extra_fields = ExtraFieldsToSerialize {
886                lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
887                incremental_snapshot_persistence: bank_incremental_snapshot_persistence,
888                epoch_accounts_hash,
889                versioned_epoch_stakes,
890                accounts_lt_hash: bank_fields.accounts_lt_hash.clone().map(Into::into),
891            };
892            serde_snapshot::serialize_bank_snapshot_into(
893                stream,
894                bank_fields,
895                bank_hash_stats,
896                accounts_delta_hash,
897                accounts_hash,
898                &get_storages_to_serialize(snapshot_storages),
899                extra_fields,
900                write_version,
901            )?;
902            Ok(())
903        };
904        let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
905            serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
906                .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
907            "bank serialize"
908        );
909
910        let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
911        let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
912            snapshot_bank_utils::serialize_status_cache(slot_deltas, &status_cache_path)
913                .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
914        );
915
916        let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
917        let (_, write_version_file_us) = measure_us!(fs::write(
918            &version_path,
919            snapshot_version.as_str().as_bytes(),
920        )
921        .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
922
923        // Mark this directory complete so it can be used.  Check this flag first before selecting for deserialization.
924        let state_complete_path = bank_snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
925        let (_, write_state_complete_file_us) = measure_us!(fs::File::create(&state_complete_path)
926            .map_err(|err| {
927                AddBankSnapshotError::CreateStateCompleteFile(err, state_complete_path)
928            })?);
929
930        measure_everything.stop();
931
932        // Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
933        datapoint_info!(
934            "snapshot_bank",
935            ("slot", slot, i64),
936            ("bank_size", bank_snapshot_consumed_size, i64),
937            ("status_cache_size", status_cache_consumed_size, i64),
938            ("flush_storages_us", flush_storages_us, i64),
939            ("hard_link_storages_us", hard_link_storages_us, i64),
940            ("bank_serialize_us", bank_serialize.as_us(), i64),
941            ("status_cache_serialize_us", status_cache_serialize_us, i64),
942            ("write_version_file_us", write_version_file_us, i64),
943            (
944                "write_state_complete_file_us",
945                write_state_complete_file_us,
946                i64
947            ),
948            ("total_us", measure_everything.as_us(), i64),
949        );
950
951        info!(
952            "{} for slot {} at {}",
953            bank_serialize,
954            slot,
955            bank_snapshot_path.display(),
956        );
957
958        Ok(BankSnapshotInfo {
959            slot,
960            snapshot_kind: BankSnapshotKind::Pre,
961            snapshot_dir: bank_snapshot_dir,
962            snapshot_version,
963        })
964    };
965
966    do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
967}
968
969/// Archives a snapshot into `archive_path`
970fn archive_snapshot(
971    snapshot_kind: SnapshotKind,
972    snapshot_slot: Slot,
973    snapshot_hash: SnapshotHash,
974    snapshot_storages: &[Arc<AccountStorageEntry>],
975    bank_snapshot_dir: impl AsRef<Path>,
976    archive_path: impl AsRef<Path>,
977    archive_format: ArchiveFormat,
978) -> Result<SnapshotArchiveInfo> {
979    use ArchiveSnapshotPackageError as E;
980    const SNAPSHOTS_DIR: &str = "snapshots";
981    const ACCOUNTS_DIR: &str = "accounts";
982    info!("Generating snapshot archive for slot {snapshot_slot}, kind: {snapshot_kind:?}");
983
984    let mut timer = Measure::start("snapshot_package-package_snapshots");
985    let tar_dir = archive_path
986        .as_ref()
987        .parent()
988        .expect("Tar output path is invalid");
989
990    fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
991
992    // Create the staging directories
993    let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
994    let staging_dir = tempfile::Builder::new()
995        .prefix(&format!("{}{}-", staging_dir_prefix, snapshot_slot))
996        .tempdir_in(tar_dir)
997        .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
998    let staging_snapshots_dir = staging_dir.path().join(SNAPSHOTS_DIR);
999
1000    let slot_str = snapshot_slot.to_string();
1001    let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
1002    // Creates staging snapshots/<slot>/
1003    fs::create_dir_all(&staging_snapshot_dir)
1004        .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
1005
1006    // To be a source for symlinking and archiving, the path need to be an absolute path
1007    let src_snapshot_dir = bank_snapshot_dir.as_ref().canonicalize().map_err(|err| {
1008        E::CanonicalizeSnapshotSourceDir(err, bank_snapshot_dir.as_ref().to_path_buf())
1009    })?;
1010    let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
1011    let src_snapshot_file = src_snapshot_dir.join(slot_str);
1012    symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
1013        .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
1014
1015    // Following the existing archive format, the status cache is under snapshots/, not under <slot>/
1016    // like in the snapshot dir.
1017    let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1018    let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1019    symlink::symlink_file(&src_status_cache, &staging_status_cache)
1020        .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
1021
1022    // The bank snapshot has the version file, so symlink it to the correct staging path
1023    let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
1024    let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1025    symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
1026        E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
1027    })?;
1028
1029    // Tar the staging directory into the archive at `staging_archive_path`
1030    let staging_archive_path = tar_dir.join(format!(
1031        "{}{}.{}",
1032        staging_dir_prefix,
1033        snapshot_slot,
1034        archive_format.extension(),
1035    ));
1036
1037    {
1038        let mut archive_file = fs::File::create(&staging_archive_path)
1039            .map_err(|err| E::CreateArchiveFile(err, staging_archive_path.clone()))?;
1040
1041        let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
1042            let mut archive = tar::Builder::new(encoder);
1043            // Serialize the version and snapshots files before accounts so we can quickly determine the version
1044            // and other bank fields. This is necessary if we want to interleave unpacking with reconstruction
1045            archive
1046                .append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
1047                .map_err(E::ArchiveVersionFile)?;
1048            archive
1049                .append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
1050                .map_err(E::ArchiveSnapshotsDir)?;
1051
1052            for storage in snapshot_storages {
1053                let path_in_archive = Path::new(ACCOUNTS_DIR)
1054                    .join(AccountsFile::file_name(storage.slot(), storage.id()));
1055                match storage.accounts.internals_for_archive() {
1056                    InternalsForArchive::Mmap(data) => {
1057                        let mut header = tar::Header::new_gnu();
1058                        header.set_path(path_in_archive).map_err(|err| {
1059                            E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1060                        })?;
1061                        header.set_size(storage.capacity());
1062                        header.set_cksum();
1063                        archive.append(&header, data)
1064                    }
1065                    InternalsForArchive::FileIo(path) => {
1066                        archive.append_path_with_name(path, path_in_archive)
1067                    }
1068                }
1069                .map_err(|err| E::ArchiveAccountStorageFile(err, storage.path().to_path_buf()))?;
1070            }
1071
1072            archive.into_inner().map_err(E::FinishArchive)?;
1073            Ok(())
1074        };
1075
1076        match archive_format {
1077            ArchiveFormat::TarBzip2 => {
1078                let mut encoder =
1079                    bzip2::write::BzEncoder::new(archive_file, bzip2::Compression::best());
1080                do_archive_files(&mut encoder)?;
1081                encoder.finish().map_err(E::FinishEncoder)?;
1082            }
1083            ArchiveFormat::TarGzip => {
1084                let mut encoder =
1085                    flate2::write::GzEncoder::new(archive_file, flate2::Compression::default());
1086                do_archive_files(&mut encoder)?;
1087                encoder.finish().map_err(E::FinishEncoder)?;
1088            }
1089            ArchiveFormat::TarZstd { config } => {
1090                let mut encoder =
1091                    zstd::stream::Encoder::new(archive_file, config.compression_level)
1092                        .map_err(E::CreateEncoder)?;
1093                do_archive_files(&mut encoder)?;
1094                encoder.finish().map_err(E::FinishEncoder)?;
1095            }
1096            ArchiveFormat::TarLz4 => {
1097                let mut encoder = lz4::EncoderBuilder::new()
1098                    .level(1)
1099                    .build(archive_file)
1100                    .map_err(E::CreateEncoder)?;
1101                do_archive_files(&mut encoder)?;
1102                let (_output, result) = encoder.finish();
1103                result.map_err(E::FinishEncoder)?;
1104            }
1105            ArchiveFormat::Tar => {
1106                do_archive_files(&mut archive_file)?;
1107            }
1108        };
1109    }
1110
1111    // Atomically move the archive into position for other validators to find
1112    let metadata = fs::metadata(&staging_archive_path)
1113        .map_err(|err| E::QueryArchiveMetadata(err, staging_archive_path.clone()))?;
1114    let archive_path = archive_path.as_ref().to_path_buf();
1115    fs::rename(&staging_archive_path, &archive_path)
1116        .map_err(|err| E::MoveArchive(err, staging_archive_path, archive_path.clone()))?;
1117
1118    timer.stop();
1119    info!(
1120        "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
1121        archive_path.display(),
1122        snapshot_slot,
1123        timer.as_ms(),
1124        metadata.len()
1125    );
1126
1127    datapoint_info!(
1128        "archive-snapshot-package",
1129        ("slot", snapshot_slot, i64),
1130        ("archive_format", archive_format.to_string(), String),
1131        ("duration_ms", timer.as_ms(), i64),
1132        (
1133            if snapshot_kind.is_full_snapshot() {
1134                "full-snapshot-archive-size"
1135            } else {
1136                "incremental-snapshot-archive-size"
1137            },
1138            metadata.len(),
1139            i64
1140        ),
1141    );
1142    Ok(SnapshotArchiveInfo {
1143        path: archive_path,
1144        slot: snapshot_slot,
1145        hash: snapshot_hash,
1146        archive_format,
1147    })
1148}
1149
1150/// Get the bank snapshots in a directory
1151pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1152    let mut bank_snapshots = Vec::default();
1153    match fs::read_dir(&bank_snapshots_dir) {
1154        Err(err) => {
1155            info!(
1156                "Unable to read bank snapshots directory '{}': {err}",
1157                bank_snapshots_dir.as_ref().display(),
1158            );
1159        }
1160        Ok(paths) => paths
1161            .filter_map(|entry| {
1162                // check if this entry is a directory and only a Slot
1163                // bank snapshots are bank_snapshots_dir/slot/slot(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION)
1164                entry
1165                    .ok()
1166                    .filter(|entry| entry.path().is_dir())
1167                    .and_then(|entry| {
1168                        entry
1169                            .path()
1170                            .file_name()
1171                            .and_then(|file_name| file_name.to_str())
1172                            .and_then(|file_name| file_name.parse::<Slot>().ok())
1173                    })
1174            })
1175            .for_each(
1176                |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
1177                    Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
1178                    // Other threads may be modifying bank snapshots in parallel; only return
1179                    // snapshots that are complete as deemed by BankSnapshotInfo::new_from_dir()
1180                    Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
1181                },
1182            ),
1183    }
1184    bank_snapshots
1185}
1186
1187/// Get the bank snapshots in a directory
1188///
1189/// This function retains only the bank snapshots of kind BankSnapshotKind::Pre
1190pub fn get_bank_snapshots_pre(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1191    let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1192    bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Pre);
1193    bank_snapshots
1194}
1195
1196/// Get the bank snapshots in a directory
1197///
1198/// This function retains only the bank snapshots of kind BankSnapshotKind::Post
1199pub fn get_bank_snapshots_post(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1200    let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1201    bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Post);
1202    bank_snapshots
1203}
1204
1205/// Get the bank snapshot with the highest slot in a directory
1206///
1207/// This function gets the highest bank snapshot of kind BankSnapshotKind::Pre
1208pub fn get_highest_bank_snapshot_pre(
1209    bank_snapshots_dir: impl AsRef<Path>,
1210) -> Option<BankSnapshotInfo> {
1211    do_get_highest_bank_snapshot(get_bank_snapshots_pre(bank_snapshots_dir))
1212}
1213
1214/// Get the bank snapshot with the highest slot in a directory
1215///
1216/// This function gets the highest bank snapshot of kind BankSnapshotKind::Post
1217pub fn get_highest_bank_snapshot_post(
1218    bank_snapshots_dir: impl AsRef<Path>,
1219) -> Option<BankSnapshotInfo> {
1220    do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
1221}
1222
1223/// Get the bank snapshot with the highest slot in a directory
1224///
1225/// This function gets the highest bank snapshot of any kind
1226pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
1227    do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
1228}
1229
1230fn do_get_highest_bank_snapshot(
1231    mut bank_snapshots: Vec<BankSnapshotInfo>,
1232) -> Option<BankSnapshotInfo> {
1233    bank_snapshots.sort_unstable();
1234    bank_snapshots.into_iter().next_back()
1235}
1236
1237pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
1238where
1239    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1240{
1241    serialize_snapshot_data_file_capped::<F>(
1242        data_file_path,
1243        MAX_SNAPSHOT_DATA_FILE_SIZE,
1244        serializer,
1245    )
1246}
1247
1248pub fn deserialize_snapshot_data_file<T: Sized>(
1249    data_file_path: &Path,
1250    deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
1251) -> Result<T> {
1252    let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
1253        deserializer(streams.full_snapshot_stream)
1254    };
1255
1256    let wrapped_data_file_path = SnapshotRootPaths {
1257        full_snapshot_root_file_path: data_file_path.to_path_buf(),
1258        incremental_snapshot_root_file_path: None,
1259    };
1260
1261    deserialize_snapshot_data_files_capped(
1262        &wrapped_data_file_path,
1263        MAX_SNAPSHOT_DATA_FILE_SIZE,
1264        wrapped_deserializer,
1265    )
1266}
1267
1268pub fn deserialize_snapshot_data_files<T: Sized>(
1269    snapshot_root_paths: &SnapshotRootPaths,
1270    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1271) -> Result<T> {
1272    deserialize_snapshot_data_files_capped(
1273        snapshot_root_paths,
1274        MAX_SNAPSHOT_DATA_FILE_SIZE,
1275        deserializer,
1276    )
1277}
1278
1279fn serialize_snapshot_data_file_capped<F>(
1280    data_file_path: &Path,
1281    maximum_file_size: u64,
1282    serializer: F,
1283) -> Result<u64>
1284where
1285    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1286{
1287    let data_file = fs::File::create(data_file_path)?;
1288    let mut data_file_stream = BufWriter::new(data_file);
1289    serializer(&mut data_file_stream)?;
1290    data_file_stream.flush()?;
1291
1292    let consumed_size = data_file_stream.stream_position()?;
1293    if consumed_size > maximum_file_size {
1294        let error_message = format!(
1295            "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
1296            data_file_path.display(),
1297        );
1298        return Err(IoError::other(error_message).into());
1299    }
1300    Ok(consumed_size)
1301}
1302
1303fn deserialize_snapshot_data_files_capped<T: Sized>(
1304    snapshot_root_paths: &SnapshotRootPaths,
1305    maximum_file_size: u64,
1306    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1307) -> Result<T> {
1308    let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
1309        create_snapshot_data_file_stream(
1310            &snapshot_root_paths.full_snapshot_root_file_path,
1311            maximum_file_size,
1312        )?;
1313
1314    let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
1315        if let Some(ref incremental_snapshot_root_file_path) =
1316            snapshot_root_paths.incremental_snapshot_root_file_path
1317        {
1318            Some(create_snapshot_data_file_stream(
1319                incremental_snapshot_root_file_path,
1320                maximum_file_size,
1321            )?)
1322        } else {
1323            None
1324        }
1325        .unzip();
1326
1327    let mut snapshot_streams = SnapshotStreams {
1328        full_snapshot_stream: &mut full_snapshot_data_file_stream,
1329        incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
1330    };
1331    let ret = deserializer(&mut snapshot_streams)?;
1332
1333    check_deserialize_file_consumed(
1334        full_snapshot_file_size,
1335        &snapshot_root_paths.full_snapshot_root_file_path,
1336        &mut full_snapshot_data_file_stream,
1337    )?;
1338
1339    if let Some(ref incremental_snapshot_root_file_path) =
1340        snapshot_root_paths.incremental_snapshot_root_file_path
1341    {
1342        check_deserialize_file_consumed(
1343            incremental_snapshot_file_size.unwrap(),
1344            incremental_snapshot_root_file_path,
1345            incremental_snapshot_data_file_stream.as_mut().unwrap(),
1346        )?;
1347    }
1348
1349    Ok(ret)
1350}
1351
1352/// Before running the deserializer function, perform common operations on the snapshot archive
1353/// files, such as checking the file size and opening the file into a stream.
1354fn create_snapshot_data_file_stream(
1355    snapshot_root_file_path: impl AsRef<Path>,
1356    maximum_file_size: u64,
1357) -> Result<(u64, BufReader<std::fs::File>)> {
1358    let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
1359
1360    if snapshot_file_size > maximum_file_size {
1361        let error_message = format!(
1362            "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
1363            snapshot_root_file_path.as_ref().display(),
1364            snapshot_file_size,
1365            maximum_file_size,
1366        );
1367        return Err(IoError::other(error_message).into());
1368    }
1369
1370    let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
1371    let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
1372
1373    Ok((snapshot_file_size, snapshot_data_file_stream))
1374}
1375
1376/// After running the deserializer function, perform common checks to ensure the snapshot archive
1377/// files were consumed correctly.
1378fn check_deserialize_file_consumed(
1379    file_size: u64,
1380    file_path: impl AsRef<Path>,
1381    file_stream: &mut BufReader<std::fs::File>,
1382) -> Result<()> {
1383    let consumed_size = file_stream.stream_position()?;
1384
1385    if consumed_size != file_size {
1386        let error_message = format!(
1387            "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to deserialize",
1388            file_path.as_ref().display(),
1389            file_size,
1390            consumed_size,
1391        );
1392        return Err(IoError::other(error_message).into());
1393    }
1394
1395    Ok(())
1396}
1397
1398/// Return account path from the appendvec path after checking its format.
1399fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
1400    let run_path = appendvec_path.parent()?;
1401    let run_file_name = run_path.file_name()?;
1402    // All appendvec files should be under <account_path>/run/.
1403    // When generating the bank snapshot directory, they are hardlinked to <account_path>/snapshot/<slot>/
1404    if run_file_name != ACCOUNTS_RUN_DIR {
1405        error!(
1406            "The account path {} does not have run/ as its immediate parent directory.",
1407            run_path.display()
1408        );
1409        return None;
1410    }
1411    let account_path = run_path.parent()?;
1412    Some(account_path.to_path_buf())
1413}
1414
1415/// From an appendvec path, derive the snapshot hardlink path.  If the corresponding snapshot hardlink
1416/// directory does not exist, create it.
1417fn get_snapshot_accounts_hardlink_dir(
1418    appendvec_path: &Path,
1419    bank_slot: Slot,
1420    account_paths: &mut HashSet<PathBuf>,
1421    hardlinks_dir: impl AsRef<Path>,
1422) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1423    let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1424        GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1425    })?;
1426
1427    let snapshot_hardlink_dir = account_path
1428        .join(ACCOUNTS_SNAPSHOT_DIR)
1429        .join(bank_slot.to_string());
1430
1431    // Use the hashset to track, to avoid checking the file system.  Only set up the hardlink directory
1432    // and the symlink to it at the first time of seeing the account_path.
1433    if !account_paths.contains(&account_path) {
1434        let idx = account_paths.len();
1435        debug!(
1436            "for appendvec_path {}, create hard-link path {}",
1437            appendvec_path.display(),
1438            snapshot_hardlink_dir.display()
1439        );
1440        fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1441            GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1442                err,
1443                snapshot_hardlink_dir.clone(),
1444            )
1445        })?;
1446        let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1447        symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1448            GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1449                source: err,
1450                original: snapshot_hardlink_dir.clone(),
1451                link: symlink_path,
1452            }
1453        })?;
1454        account_paths.insert(account_path);
1455    };
1456
1457    Ok(snapshot_hardlink_dir)
1458}
1459
1460/// Hard-link the files from accounts/ to snapshot/<bank_slot>/accounts/
1461/// This keeps the appendvec files alive and with the bank snapshot.  The slot and id
1462/// in the file names are also updated in case its file is a recycled one with inconsistent slot
1463/// and id.
1464pub fn hard_link_storages_to_snapshot(
1465    bank_snapshot_dir: impl AsRef<Path>,
1466    bank_slot: Slot,
1467    snapshot_storages: &[Arc<AccountStorageEntry>],
1468) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1469    let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1470    fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1471        HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1472            err,
1473            accounts_hardlinks_dir.clone(),
1474        )
1475    })?;
1476
1477    let mut account_paths: HashSet<PathBuf> = HashSet::new();
1478    for storage in snapshot_storages {
1479        let storage_path = storage.accounts.path();
1480        let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1481            storage_path,
1482            bank_slot,
1483            &mut account_paths,
1484            &accounts_hardlinks_dir,
1485        )?;
1486        // The appendvec could be recycled, so its filename may not be consistent to the slot and id.
1487        // Use the storage slot and id to compose a consistent file name for the hard-link file.
1488        let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
1489        let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1490        fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
1491            HardLinkStoragesToSnapshotError::HardLinkStorage(
1492                err,
1493                storage_path.to_path_buf(),
1494                hard_link_path,
1495            )
1496        })?;
1497    }
1498    Ok(())
1499}
1500
1501/// serializing needs Vec<Vec<Arc<AccountStorageEntry>>>, but data structure at runtime is Vec<Arc<AccountStorageEntry>>
1502/// translates to what we need
1503pub(crate) fn get_storages_to_serialize(
1504    snapshot_storages: &[Arc<AccountStorageEntry>],
1505) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1506    snapshot_storages
1507        .iter()
1508        .map(|storage| vec![Arc::clone(storage)])
1509        .collect::<Vec<_>>()
1510}
1511
1512// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
1513const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
1514
1515/// Unarchives the given full and incremental snapshot archives, as long as they are compatible.
1516pub fn verify_and_unarchive_snapshots(
1517    bank_snapshots_dir: impl AsRef<Path>,
1518    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1519    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1520    account_paths: &[PathBuf],
1521    storage_access: StorageAccess,
1522) -> Result<(
1523    UnarchivedSnapshot,
1524    Option<UnarchivedSnapshot>,
1525    AtomicAccountsFileId,
1526)> {
1527    check_are_snapshots_compatible(
1528        full_snapshot_archive_info,
1529        incremental_snapshot_archive_info,
1530    )?;
1531
1532    let parallel_divisions = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT);
1533
1534    let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
1535    let unarchived_full_snapshot = unarchive_snapshot(
1536        &bank_snapshots_dir,
1537        TMP_SNAPSHOT_ARCHIVE_PREFIX,
1538        full_snapshot_archive_info.path(),
1539        "snapshot untar",
1540        account_paths,
1541        full_snapshot_archive_info.archive_format(),
1542        parallel_divisions,
1543        next_append_vec_id.clone(),
1544        storage_access,
1545    )?;
1546
1547    let unarchived_incremental_snapshot =
1548        if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1549            let unarchived_incremental_snapshot = unarchive_snapshot(
1550                &bank_snapshots_dir,
1551                TMP_SNAPSHOT_ARCHIVE_PREFIX,
1552                incremental_snapshot_archive_info.path(),
1553                "incremental snapshot untar",
1554                account_paths,
1555                incremental_snapshot_archive_info.archive_format(),
1556                parallel_divisions,
1557                next_append_vec_id.clone(),
1558                storage_access,
1559            )?;
1560            Some(unarchived_incremental_snapshot)
1561        } else {
1562            None
1563        };
1564
1565    Ok((
1566        unarchived_full_snapshot,
1567        unarchived_incremental_snapshot,
1568        Arc::try_unwrap(next_append_vec_id).unwrap(),
1569    ))
1570}
1571
1572/// Spawns a thread for unpacking a snapshot
1573fn spawn_unpack_snapshot_thread(
1574    file_sender: Sender<PathBuf>,
1575    account_paths: Arc<Vec<PathBuf>>,
1576    ledger_dir: Arc<PathBuf>,
1577    mut archive: Archive<SharedBufferReader>,
1578    parallel_selector: Option<ParallelSelector>,
1579    thread_index: usize,
1580) -> JoinHandle<()> {
1581    Builder::new()
1582        .name(format!("solUnpkSnpsht{thread_index:02}"))
1583        .spawn(move || {
1584            hardened_unpack::streaming_unpack_snapshot(
1585                &mut archive,
1586                ledger_dir.as_path(),
1587                &account_paths,
1588                parallel_selector,
1589                &file_sender,
1590            )
1591            .unwrap();
1592        })
1593        .unwrap()
1594}
1595
1596/// Streams unpacked files across channel
1597fn streaming_unarchive_snapshot(
1598    file_sender: Sender<PathBuf>,
1599    account_paths: Vec<PathBuf>,
1600    ledger_dir: PathBuf,
1601    snapshot_archive_path: PathBuf,
1602    archive_format: ArchiveFormat,
1603    num_threads: usize,
1604) -> Vec<JoinHandle<()>> {
1605    let account_paths = Arc::new(account_paths);
1606    let ledger_dir = Arc::new(ledger_dir);
1607    let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format);
1608
1609    // All shared buffer readers need to be created before the threads are spawned
1610    let archives: Vec<_> = (0..num_threads)
1611        .map(|_| {
1612            let reader = SharedBufferReader::new(&shared_buffer);
1613            Archive::new(reader)
1614        })
1615        .collect();
1616
1617    archives
1618        .into_iter()
1619        .enumerate()
1620        .map(|(thread_index, archive)| {
1621            let parallel_selector = Some(ParallelSelector {
1622                index: thread_index,
1623                divisions: num_threads,
1624            });
1625
1626            spawn_unpack_snapshot_thread(
1627                file_sender.clone(),
1628                account_paths.clone(),
1629                ledger_dir.clone(),
1630                archive,
1631                parallel_selector,
1632                thread_index,
1633            )
1634        })
1635        .collect()
1636}
1637
1638/// BankSnapshotInfo::new_from_dir() requires a few meta files to accept a snapshot dir
1639/// as a valid one.  A dir unpacked from an archive lacks these files.  Fill them here to
1640/// allow new_from_dir() checks to pass.  These checks are not needed for unpacked dirs,
1641/// but it is not clean to add another flag to new_from_dir() to skip them.
1642fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1643    let snapshots_dir = unpack_dir.as_ref().join("snapshots");
1644    if !snapshots_dir.is_dir() {
1645        return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1646    }
1647
1648    // The unpacked dir has a single slot dir, which is the snapshot slot dir.
1649    let slot_dir = std::fs::read_dir(&snapshots_dir)
1650        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1651        .find(|entry| entry.as_ref().unwrap().path().is_dir())
1652        .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1653        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1654        .path();
1655
1656    let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
1657    fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
1658
1659    let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1660    fs::hard_link(
1661        status_cache_file,
1662        slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
1663    )?;
1664
1665    let state_complete_file = slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
1666    fs::File::create(state_complete_file)?;
1667
1668    Ok(())
1669}
1670
1671/// Perform the common tasks when unarchiving a snapshot.  Handles creating the temporary
1672/// directories, untaring, reading the version file, and then returning those fields plus the
1673/// rebuilt storage
1674fn unarchive_snapshot(
1675    bank_snapshots_dir: impl AsRef<Path>,
1676    unpacked_snapshots_dir_prefix: &'static str,
1677    snapshot_archive_path: impl AsRef<Path>,
1678    measure_name: &'static str,
1679    account_paths: &[PathBuf],
1680    archive_format: ArchiveFormat,
1681    parallel_divisions: usize,
1682    next_append_vec_id: Arc<AtomicAccountsFileId>,
1683    storage_access: StorageAccess,
1684) -> Result<UnarchivedSnapshot> {
1685    let unpack_dir = tempfile::Builder::new()
1686        .prefix(unpacked_snapshots_dir_prefix)
1687        .tempdir_in(bank_snapshots_dir)?;
1688    let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
1689
1690    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1691    streaming_unarchive_snapshot(
1692        file_sender,
1693        account_paths.to_vec(),
1694        unpack_dir.path().to_path_buf(),
1695        snapshot_archive_path.as_ref().to_path_buf(),
1696        archive_format,
1697        parallel_divisions,
1698    );
1699
1700    let num_rebuilder_threads = num_cpus::get_physical()
1701        .saturating_sub(parallel_divisions)
1702        .max(1);
1703    let (version_and_storages, measure_untar) = measure_time!(
1704        SnapshotStorageRebuilder::rebuild_storage(
1705            file_receiver,
1706            num_rebuilder_threads,
1707            next_append_vec_id,
1708            SnapshotFrom::Archive,
1709            storage_access,
1710        )?,
1711        measure_name
1712    );
1713    info!("{}", measure_untar);
1714
1715    create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1716
1717    let RebuiltSnapshotStorage {
1718        snapshot_version,
1719        storage,
1720    } = version_and_storages;
1721    Ok(UnarchivedSnapshot {
1722        unpack_dir,
1723        storage,
1724        unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1725            unpacked_snapshots_dir,
1726            snapshot_version,
1727        },
1728        measure_untar,
1729    })
1730}
1731
1732/// Streams snapshot dir files across channel
1733/// Follow the flow of streaming_unarchive_snapshot(), but handle the from_dir case.
1734fn streaming_snapshot_dir_files(
1735    file_sender: Sender<PathBuf>,
1736    snapshot_file_path: impl Into<PathBuf>,
1737    snapshot_version_path: impl Into<PathBuf>,
1738    account_paths: &[PathBuf],
1739) -> Result<()> {
1740    file_sender.send(snapshot_file_path.into())?;
1741    file_sender.send(snapshot_version_path.into())?;
1742
1743    for account_path in account_paths {
1744        for file in fs::read_dir(account_path)? {
1745            file_sender.send(file?.path())?;
1746        }
1747    }
1748
1749    Ok(())
1750}
1751
1752/// Performs the common tasks when deserializing a snapshot
1753///
1754/// Handles reading the snapshot file and version file,
1755/// then returning those fields plus the rebuilt storages.
1756pub fn rebuild_storages_from_snapshot_dir(
1757    snapshot_info: &BankSnapshotInfo,
1758    account_paths: &[PathBuf],
1759    next_append_vec_id: Arc<AtomicAccountsFileId>,
1760    storage_access: StorageAccess,
1761) -> Result<AccountStorageMap> {
1762    let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1763    let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1764    let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1765
1766    let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1767        IoError::other(format!(
1768            "failed to read accounts hardlinks dir '{}': {err}",
1769            accounts_hardlinks.display(),
1770        ))
1771    })?;
1772    for dir_entry in read_dir {
1773        let symlink_path = dir_entry?.path();
1774        // The symlink point to <account_path>/snapshot/<slot> which contain the account files hardlinks
1775        // The corresponding run path should be <account_path>/run/
1776        let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
1777            IoError::other(format!(
1778                "failed to read symlink '{}': {err}",
1779                symlink_path.display(),
1780            ))
1781        })?;
1782        let account_run_path = account_snapshot_path
1783            .parent()
1784            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1785            .parent()
1786            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1787            .join(ACCOUNTS_RUN_DIR);
1788        if !account_run_paths.contains(&account_run_path) {
1789            // The appendvec from the bank snapshot storage does not match any of the provided account_paths set.
1790            // The accout paths have changed so the snapshot is no longer usable.
1791            return Err(SnapshotError::AccountPathsMismatch);
1792        }
1793        // Generate hard-links to make the account files available in the main accounts/, and let the new appendvec
1794        // paths be in accounts/
1795        let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
1796            IoError::other(format!(
1797                "failed to read account snapshot dir '{}': {err}",
1798                account_snapshot_path.display(),
1799            ))
1800        })?;
1801        for file in read_dir {
1802            let file_path = file?.path();
1803            let file_name = file_path
1804                .file_name()
1805                .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
1806            let dest_path = account_run_path.join(file_name);
1807            fs::hard_link(&file_path, &dest_path).map_err(|err| {
1808                IoError::other(format!(
1809                    "failed to hard link from '{}' to '{}': {err}",
1810                    file_path.display(),
1811                    dest_path.display(),
1812                ))
1813            })?;
1814        }
1815    }
1816
1817    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1818    let snapshot_file_path = &snapshot_info.snapshot_path();
1819    let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1820    streaming_snapshot_dir_files(
1821        file_sender,
1822        snapshot_file_path,
1823        snapshot_version_path,
1824        account_paths,
1825    )?;
1826
1827    let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1828    let version_and_storages = SnapshotStorageRebuilder::rebuild_storage(
1829        file_receiver,
1830        num_rebuilder_threads,
1831        next_append_vec_id,
1832        SnapshotFrom::Dir,
1833        storage_access,
1834    )?;
1835
1836    let RebuiltSnapshotStorage {
1837        snapshot_version: _,
1838        storage,
1839    } = version_and_storages;
1840    Ok(storage)
1841}
1842
1843/// Reads the `snapshot_version` from a file. Before opening the file, its size
1844/// is compared to `MAX_SNAPSHOT_VERSION_FILE_SIZE`. If the size exceeds this
1845/// threshold, it is not opened and an error is returned.
1846fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
1847    // Check file size.
1848    let file_metadata = fs::metadata(&path).map_err(|err| {
1849        IoError::other(format!(
1850            "failed to query snapshot version file metadata '{}': {err}",
1851            path.as_ref().display(),
1852        ))
1853    })?;
1854    let file_size = file_metadata.len();
1855    if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
1856        let error_message = format!(
1857            "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
1858            path.as_ref().display(),
1859            file_size,
1860            MAX_SNAPSHOT_VERSION_FILE_SIZE,
1861        );
1862        return Err(IoError::other(error_message).into());
1863    }
1864
1865    // Read snapshot_version from file.
1866    let mut snapshot_version = String::new();
1867    let mut file = fs::File::open(&path).map_err(|err| {
1868        IoError::other(format!(
1869            "failed to open snapshot version file '{}': {err}",
1870            path.as_ref().display()
1871        ))
1872    })?;
1873    file.read_to_string(&mut snapshot_version).map_err(|err| {
1874        IoError::other(format!(
1875            "failed to read snapshot version from file '{}': {err}",
1876            path.as_ref().display()
1877        ))
1878    })?;
1879
1880    Ok(snapshot_version.trim().to_string())
1881}
1882
1883/// Check if an incremental snapshot is compatible with a full snapshot.  This is done by checking
1884/// if the incremental snapshot's base slot is the same as the full snapshot's slot.
1885fn check_are_snapshots_compatible(
1886    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1887    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1888) -> Result<()> {
1889    if incremental_snapshot_archive_info.is_none() {
1890        return Ok(());
1891    }
1892
1893    let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
1894
1895    (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
1896        .then_some(())
1897        .ok_or_else(|| {
1898            SnapshotError::MismatchedBaseSlot(
1899                full_snapshot_archive_info.slot(),
1900                incremental_snapshot_archive_info.base_slot(),
1901            )
1902        })
1903}
1904
1905/// Get the `&str` from a `&Path`
1906pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
1907    path.file_name()
1908        .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))?
1909        .to_str()
1910        .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
1911}
1912
1913pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
1914    snapshot_archives_dir
1915        .as_ref()
1916        .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
1917}
1918
1919/// Build the full snapshot archive path from its components: the snapshot archives directory, the
1920/// snapshot slot, the accounts hash, and the archive format.
1921pub fn build_full_snapshot_archive_path(
1922    full_snapshot_archives_dir: impl AsRef<Path>,
1923    slot: Slot,
1924    hash: &SnapshotHash,
1925    archive_format: ArchiveFormat,
1926) -> PathBuf {
1927    full_snapshot_archives_dir.as_ref().join(format!(
1928        "snapshot-{}-{}.{}",
1929        slot,
1930        hash.0,
1931        archive_format.extension(),
1932    ))
1933}
1934
1935/// Build the incremental snapshot archive path from its components: the snapshot archives
1936/// directory, the snapshot base slot, the snapshot slot, the accounts hash, and the archive
1937/// format.
1938pub fn build_incremental_snapshot_archive_path(
1939    incremental_snapshot_archives_dir: impl AsRef<Path>,
1940    base_slot: Slot,
1941    slot: Slot,
1942    hash: &SnapshotHash,
1943    archive_format: ArchiveFormat,
1944) -> PathBuf {
1945    incremental_snapshot_archives_dir.as_ref().join(format!(
1946        "incremental-snapshot-{}-{}-{}.{}",
1947        base_slot,
1948        slot,
1949        hash.0,
1950        archive_format.extension(),
1951    ))
1952}
1953
1954/// Parse a full snapshot archive filename into its Slot, Hash, and Archive Format
1955pub(crate) fn parse_full_snapshot_archive_filename(
1956    archive_filename: &str,
1957) -> Result<(Slot, SnapshotHash, ArchiveFormat)> {
1958    lazy_static! {
1959        static ref RE: Regex = Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1960    }
1961
1962    let do_parse = || {
1963        RE.captures(archive_filename).and_then(|captures| {
1964            let slot = captures
1965                .name("slot")
1966                .map(|x| x.as_str().parse::<Slot>())?
1967                .ok()?;
1968            let hash = captures
1969                .name("hash")
1970                .map(|x| x.as_str().parse::<Hash>())?
1971                .ok()?;
1972            let archive_format = captures
1973                .name("ext")
1974                .map(|x| x.as_str().parse::<ArchiveFormat>())?
1975                .ok()?;
1976
1977            Some((slot, SnapshotHash(hash), archive_format))
1978        })
1979    };
1980
1981    do_parse().ok_or_else(|| {
1982        SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
1983    })
1984}
1985
1986/// Parse an incremental snapshot archive filename into its base Slot, actual Slot, Hash, and Archive Format
1987pub(crate) fn parse_incremental_snapshot_archive_filename(
1988    archive_filename: &str,
1989) -> Result<(Slot, Slot, SnapshotHash, ArchiveFormat)> {
1990    lazy_static! {
1991        static ref RE: Regex = Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1992    }
1993
1994    let do_parse = || {
1995        RE.captures(archive_filename).and_then(|captures| {
1996            let base_slot = captures
1997                .name("base")
1998                .map(|x| x.as_str().parse::<Slot>())?
1999                .ok()?;
2000            let slot = captures
2001                .name("slot")
2002                .map(|x| x.as_str().parse::<Slot>())?
2003                .ok()?;
2004            let hash = captures
2005                .name("hash")
2006                .map(|x| x.as_str().parse::<Hash>())?
2007                .ok()?;
2008            let archive_format = captures
2009                .name("ext")
2010                .map(|x| x.as_str().parse::<ArchiveFormat>())?
2011                .ok()?;
2012
2013            Some((base_slot, slot, SnapshotHash(hash), archive_format))
2014        })
2015    };
2016
2017    do_parse().ok_or_else(|| {
2018        SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2019    })
2020}
2021
2022/// Walk down the snapshot archive to collect snapshot archive file info
2023fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
2024where
2025    F: Fn(PathBuf) -> Result<T>,
2026{
2027    let walk_dir = |dir: &Path| -> Vec<T> {
2028        let entry_iter = fs::read_dir(dir);
2029        match entry_iter {
2030            Err(err) => {
2031                info!(
2032                    "Unable to read snapshot archives directory '{}': {err}",
2033                    dir.display(),
2034                );
2035                vec![]
2036            }
2037            Ok(entries) => entries
2038                .filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
2039                .collect(),
2040        }
2041    };
2042
2043    let mut ret = walk_dir(snapshot_archives_dir);
2044    let remote_dir = build_snapshot_archives_remote_dir(snapshot_archives_dir);
2045    if remote_dir.exists() {
2046        ret.append(&mut walk_dir(remote_dir.as_ref()));
2047    }
2048    ret
2049}
2050
2051/// Get a list of the full snapshot archives from a directory
2052pub fn get_full_snapshot_archives(
2053    full_snapshot_archives_dir: impl AsRef<Path>,
2054) -> Vec<FullSnapshotArchiveInfo> {
2055    get_snapshot_archives(
2056        full_snapshot_archives_dir.as_ref(),
2057        FullSnapshotArchiveInfo::new_from_path,
2058    )
2059}
2060
2061/// Get a list of the incremental snapshot archives from a directory
2062pub fn get_incremental_snapshot_archives(
2063    incremental_snapshot_archives_dir: impl AsRef<Path>,
2064) -> Vec<IncrementalSnapshotArchiveInfo> {
2065    get_snapshot_archives(
2066        incremental_snapshot_archives_dir.as_ref(),
2067        IncrementalSnapshotArchiveInfo::new_from_path,
2068    )
2069}
2070
2071/// Get the highest slot of the full snapshot archives in a directory
2072pub fn get_highest_full_snapshot_archive_slot(
2073    full_snapshot_archives_dir: impl AsRef<Path>,
2074) -> Option<Slot> {
2075    get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
2076        .map(|full_snapshot_archive_info| full_snapshot_archive_info.slot())
2077}
2078
2079/// Get the highest slot of the incremental snapshot archives in a directory, for a given full
2080/// snapshot slot
2081pub fn get_highest_incremental_snapshot_archive_slot(
2082    incremental_snapshot_archives_dir: impl AsRef<Path>,
2083    full_snapshot_slot: Slot,
2084) -> Option<Slot> {
2085    get_highest_incremental_snapshot_archive_info(
2086        incremental_snapshot_archives_dir,
2087        full_snapshot_slot,
2088    )
2089    .map(|incremental_snapshot_archive_info| incremental_snapshot_archive_info.slot())
2090}
2091
2092/// Get the path (and metadata) for the full snapshot archive with the highest slot in a directory
2093pub fn get_highest_full_snapshot_archive_info(
2094    full_snapshot_archives_dir: impl AsRef<Path>,
2095) -> Option<FullSnapshotArchiveInfo> {
2096    let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2097    full_snapshot_archives.sort_unstable();
2098    full_snapshot_archives.into_iter().next_back()
2099}
2100
2101/// Get the path for the incremental snapshot archive with the highest slot, for a given full
2102/// snapshot slot, in a directory
2103pub fn get_highest_incremental_snapshot_archive_info(
2104    incremental_snapshot_archives_dir: impl AsRef<Path>,
2105    full_snapshot_slot: Slot,
2106) -> Option<IncrementalSnapshotArchiveInfo> {
2107    // Since we want to filter down to only the incremental snapshot archives that have the same
2108    // full snapshot slot as the value passed in, perform the filtering before sorting to avoid
2109    // doing unnecessary work.
2110    let mut incremental_snapshot_archives =
2111        get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
2112            .into_iter()
2113            .filter(|incremental_snapshot_archive_info| {
2114                incremental_snapshot_archive_info.base_slot() == full_snapshot_slot
2115            })
2116            .collect::<Vec<_>>();
2117    incremental_snapshot_archives.sort_unstable();
2118    incremental_snapshot_archives.into_iter().next_back()
2119}
2120
2121pub fn purge_old_snapshot_archives(
2122    full_snapshot_archives_dir: impl AsRef<Path>,
2123    incremental_snapshot_archives_dir: impl AsRef<Path>,
2124    maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2125    maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2126) {
2127    info!(
2128        "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
2129        full_snapshot_archives_dir.as_ref().display(),
2130        maximum_full_snapshot_archives_to_retain
2131    );
2132
2133    let mut full_snapshot_archives = get_full_snapshot_archives(&full_snapshot_archives_dir);
2134    full_snapshot_archives.sort_unstable();
2135    full_snapshot_archives.reverse();
2136
2137    let num_to_retain = full_snapshot_archives
2138        .len()
2139        .min(maximum_full_snapshot_archives_to_retain.get());
2140    trace!(
2141        "There are {} full snapshot archives, retaining {}",
2142        full_snapshot_archives.len(),
2143        num_to_retain,
2144    );
2145
2146    let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
2147        if full_snapshot_archives.is_empty() {
2148            None
2149        } else {
2150            Some(full_snapshot_archives.split_at(num_to_retain))
2151        }
2152        .unwrap_or_default();
2153
2154    let retained_full_snapshot_slots = full_snapshot_archives_to_retain
2155        .iter()
2156        .map(|ai| ai.slot())
2157        .collect::<HashSet<_>>();
2158
2159    fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
2160        for path in archives.iter().map(|a| a.path()) {
2161            trace!("Removing snapshot archive: {}", path.display());
2162            let result = fs::remove_file(path);
2163            if let Err(err) = result {
2164                info!(
2165                    "Failed to remove snapshot archive '{}': {err}",
2166                    path.display()
2167                );
2168            }
2169        }
2170    }
2171    remove_archives(full_snapshot_archives_to_remove);
2172
2173    info!(
2174        "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
2175        incremental_snapshot_archives_dir.as_ref().display(),
2176        maximum_incremental_snapshot_archives_to_retain
2177    );
2178    let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
2179    for incremental_snapshot_archive in
2180        get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
2181    {
2182        incremental_snapshot_archives_by_base_slot
2183            .entry(incremental_snapshot_archive.base_slot())
2184            .or_default()
2185            .push(incremental_snapshot_archive)
2186    }
2187
2188    let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
2189    for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
2190    {
2191        incremental_snapshot_archives.sort_unstable();
2192        let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
2193            maximum_incremental_snapshot_archives_to_retain.get()
2194        } else {
2195            usize::from(retained_full_snapshot_slots.contains(&base_slot))
2196        };
2197        trace!(
2198            "There are {} incremental snapshot archives for base slot {}, removing {} of them",
2199            incremental_snapshot_archives.len(),
2200            base_slot,
2201            incremental_snapshot_archives
2202                .len()
2203                .saturating_sub(num_to_retain),
2204        );
2205
2206        incremental_snapshot_archives.truncate(
2207            incremental_snapshot_archives
2208                .len()
2209                .saturating_sub(num_to_retain),
2210        );
2211        remove_archives(&incremental_snapshot_archives);
2212    }
2213}
2214
2215#[cfg(feature = "dev-context-only-utils")]
2216fn unpack_snapshot_local(
2217    shared_buffer: SharedBuffer,
2218    ledger_dir: &Path,
2219    account_paths: &[PathBuf],
2220    parallel_divisions: usize,
2221) -> Result<UnpackedAppendVecMap> {
2222    assert!(parallel_divisions > 0);
2223
2224    // allocate all readers before any readers start reading
2225    let readers = (0..parallel_divisions)
2226        .map(|_| SharedBufferReader::new(&shared_buffer))
2227        .collect::<Vec<_>>();
2228
2229    // create 'parallel_divisions' # of parallel workers, each responsible for 1/parallel_divisions of all the files to extract.
2230    let all_unpacked_append_vec_map = readers
2231        .into_par_iter()
2232        .enumerate()
2233        .map(|(index, reader)| {
2234            let parallel_selector = Some(ParallelSelector {
2235                index,
2236                divisions: parallel_divisions,
2237            });
2238            let mut archive = Archive::new(reader);
2239            hardened_unpack::unpack_snapshot(
2240                &mut archive,
2241                ledger_dir,
2242                account_paths,
2243                parallel_selector,
2244            )
2245        })
2246        .collect::<Vec<_>>();
2247
2248    let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
2249    for h in all_unpacked_append_vec_map {
2250        unpacked_append_vec_map.extend(h?);
2251    }
2252
2253    Ok(unpacked_append_vec_map)
2254}
2255
2256fn untar_snapshot_create_shared_buffer(
2257    snapshot_tar: &Path,
2258    archive_format: ArchiveFormat,
2259) -> SharedBuffer {
2260    let open_file = || {
2261        fs::File::open(snapshot_tar)
2262            .map_err(|err| {
2263                IoError::other(format!(
2264                    "failed to open snapshot archive '{}': {err}",
2265                    snapshot_tar.display(),
2266                ))
2267            })
2268            .unwrap()
2269    };
2270    match archive_format {
2271        ArchiveFormat::TarBzip2 => SharedBuffer::new(BzDecoder::new(BufReader::new(open_file()))),
2272        ArchiveFormat::TarGzip => SharedBuffer::new(GzDecoder::new(BufReader::new(open_file()))),
2273        ArchiveFormat::TarZstd { .. } => SharedBuffer::new(
2274            zstd::stream::read::Decoder::new(BufReader::new(open_file())).unwrap(),
2275        ),
2276        ArchiveFormat::TarLz4 => {
2277            SharedBuffer::new(lz4::Decoder::new(BufReader::new(open_file())).unwrap())
2278        }
2279        ArchiveFormat::Tar => SharedBuffer::new(BufReader::new(open_file())),
2280    }
2281}
2282
2283#[cfg(feature = "dev-context-only-utils")]
2284fn untar_snapshot_in(
2285    snapshot_tar: impl AsRef<Path>,
2286    unpack_dir: &Path,
2287    account_paths: &[PathBuf],
2288    archive_format: ArchiveFormat,
2289    parallel_divisions: usize,
2290) -> Result<UnpackedAppendVecMap> {
2291    let shared_buffer = untar_snapshot_create_shared_buffer(snapshot_tar.as_ref(), archive_format);
2292    unpack_snapshot_local(shared_buffer, unpack_dir, account_paths, parallel_divisions)
2293}
2294
2295pub fn verify_unpacked_snapshots_dir_and_version(
2296    unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
2297) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
2298    info!(
2299        "snapshot version: {}",
2300        &unpacked_snapshots_dir_and_version.snapshot_version
2301    );
2302
2303    let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
2304    let mut bank_snapshots =
2305        get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
2306    if bank_snapshots.len() > 1 {
2307        return Err(IoError::other(format!(
2308            "invalid snapshot format: only one snapshot allowed, but found {}",
2309            bank_snapshots.len(),
2310        ))
2311        .into());
2312    }
2313    let root_paths = bank_snapshots.pop().ok_or_else(|| {
2314        IoError::other(format!(
2315            "no snapshots found in snapshots directory '{}'",
2316            unpacked_snapshots_dir_and_version
2317                .unpacked_snapshots_dir
2318                .display(),
2319        ))
2320    })?;
2321    Ok((snapshot_version, root_paths))
2322}
2323
2324/// Returns the file name of the bank snapshot for `slot`
2325pub fn get_snapshot_file_name(slot: Slot) -> String {
2326    slot.to_string()
2327}
2328
2329/// Constructs the path to the bank snapshot directory for `slot` within `bank_snapshots_dir`
2330pub fn get_bank_snapshot_dir(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) -> PathBuf {
2331    bank_snapshots_dir
2332        .as_ref()
2333        .join(get_snapshot_file_name(slot))
2334}
2335
2336#[derive(Debug, Copy, Clone)]
2337/// allow tests to specify what happened to the serialized format
2338pub enum VerifyBank {
2339    /// the bank's serialized format is expected to be identical to what we are comparing against
2340    Deterministic,
2341    /// the serialized bank was 'reserialized' into a non-deterministic format
2342    /// so, deserialize both files and compare deserialized results
2343    NonDeterministic,
2344}
2345
2346#[cfg(feature = "dev-context-only-utils")]
2347pub fn verify_snapshot_archive(
2348    snapshot_archive: impl AsRef<Path>,
2349    snapshots_to_verify: impl AsRef<Path>,
2350    archive_format: ArchiveFormat,
2351    verify_bank: VerifyBank,
2352    slot: Slot,
2353) {
2354    let temp_dir = tempfile::TempDir::new().unwrap();
2355    let unpack_dir = temp_dir.path();
2356    let unpack_account_dir = create_accounts_run_and_snapshot_dirs(unpack_dir).unwrap().0;
2357    untar_snapshot_in(
2358        snapshot_archive,
2359        unpack_dir,
2360        &[unpack_account_dir.clone()],
2361        archive_format,
2362        1,
2363    )
2364    .unwrap();
2365
2366    // Check snapshots are the same
2367    let unpacked_snapshots = unpack_dir.join("snapshots");
2368
2369    // Since the unpack code collects all the appendvecs into one directory unpack_account_dir, we need to
2370    // collect all the appendvecs in account_paths/<slot>/snapshot/ into one directory for later comparison.
2371    let storages_to_verify = unpack_dir.join("storages_to_verify");
2372    // Create the directory if it doesn't exist
2373    fs::create_dir_all(&storages_to_verify).unwrap();
2374
2375    let slot = slot.to_string();
2376    let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
2377
2378    if let VerifyBank::NonDeterministic = verify_bank {
2379        // file contents may be different, but deserialized structs should be equal
2380        let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
2381        let p2 = unpacked_snapshots.join(&slot).join(&slot);
2382        assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
2383        fs::remove_file(p1).unwrap();
2384        fs::remove_file(p2).unwrap();
2385    }
2386
2387    // The new the status_cache file is inside the slot directory together with the snapshot file.
2388    // When unpacking an archive, the status_cache file from the archive is one-level up outside of
2389    //  the slot directory.
2390    // The unpacked status_cache file need to be put back into the slot directory for the directory
2391    // comparison to pass.
2392    let existing_unpacked_status_cache_file =
2393        unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2394    let new_unpacked_status_cache_file = unpacked_snapshots
2395        .join(&slot)
2396        .join(SNAPSHOT_STATUS_CACHE_FILENAME);
2397    fs::rename(
2398        existing_unpacked_status_cache_file,
2399        new_unpacked_status_cache_file,
2400    )
2401    .unwrap();
2402
2403    let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2404    if accounts_hardlinks_dir.is_dir() {
2405        // This directory contain symlinks to all <account_path>/snapshot/<slot> directories.
2406        for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
2407            let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
2408            // Copy all the files in dst_path into the storages_to_verify directory.
2409            for entry in fs::read_dir(&link_dst_path).unwrap() {
2410                let src_path = entry.unwrap().path();
2411                let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
2412                fs::copy(src_path, dst_path).unwrap();
2413            }
2414        }
2415        fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
2416    }
2417
2418    let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
2419    if version_path.is_file() {
2420        fs::remove_file(version_path).unwrap();
2421    }
2422
2423    let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2424    if state_complete_path.is_file() {
2425        fs::remove_file(state_complete_path).unwrap();
2426    }
2427
2428    assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
2429
2430    // In the unarchiving case, there is an extra empty "accounts" directory. The account
2431    // files in the archive accounts/ have been expanded to [account_paths].
2432    // Remove the empty "accounts" directory for the directory comparison below.
2433    // In some test cases the directory to compare do not come from unarchiving.
2434    // Ignore the error when this directory does not exist.
2435    _ = fs::remove_dir(unpack_account_dir.join("accounts"));
2436    // Check the account entries are the same
2437    assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
2438}
2439
2440/// Purges all bank snapshots
2441pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
2442    let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2443    purge_bank_snapshots(&bank_snapshots);
2444}
2445
2446/// Purges bank snapshots, retaining the newest `num_bank_snapshots_to_retain`
2447pub fn purge_old_bank_snapshots(
2448    bank_snapshots_dir: impl AsRef<Path>,
2449    num_bank_snapshots_to_retain: usize,
2450    filter_by_kind: Option<BankSnapshotKind>,
2451) {
2452    let mut bank_snapshots = match filter_by_kind {
2453        Some(BankSnapshotKind::Pre) => get_bank_snapshots_pre(&bank_snapshots_dir),
2454        Some(BankSnapshotKind::Post) => get_bank_snapshots_post(&bank_snapshots_dir),
2455        None => get_bank_snapshots(&bank_snapshots_dir),
2456    };
2457
2458    bank_snapshots.sort_unstable();
2459    purge_bank_snapshots(
2460        bank_snapshots
2461            .iter()
2462            .rev()
2463            .skip(num_bank_snapshots_to_retain),
2464    );
2465}
2466
2467/// At startup, purge old (i.e. unusable) bank snapshots
2468///
2469/// Only a single bank snapshot could be needed at startup (when using fast boot), so
2470/// retain the highest bank snapshot "post", and purge the rest.
2471pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
2472    purge_old_bank_snapshots(&bank_snapshots_dir, 0, Some(BankSnapshotKind::Pre));
2473    purge_old_bank_snapshots(&bank_snapshots_dir, 1, Some(BankSnapshotKind::Post));
2474
2475    let highest_bank_snapshot_post = get_highest_bank_snapshot_post(&bank_snapshots_dir);
2476    if let Some(highest_bank_snapshot_post) = highest_bank_snapshot_post {
2477        debug!(
2478            "Retained bank snapshot for slot {}, and purged the rest.",
2479            highest_bank_snapshot_post.slot
2480        );
2481    }
2482}
2483
2484/// Purges bank snapshots that are older than `slot`
2485pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
2486    let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2487    bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
2488    purge_bank_snapshots(&bank_snapshots);
2489}
2490
2491/// Purges all `bank_snapshots`
2492///
2493/// Does not exit early if there is an error while purging a bank snapshot.
2494fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
2495    for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
2496        if purge_bank_snapshot(snapshot_dir).is_err() {
2497            warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
2498        }
2499    }
2500}
2501
2502/// Remove the bank snapshot at this path
2503pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
2504    const FN_ERR: &str = "failed to purge bank snapshot";
2505    let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2506    if accounts_hardlinks_dir.is_dir() {
2507        // This directory contain symlinks to all accounts snapshot directories.
2508        // They should all be removed.
2509        let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
2510            IoError::other(format!(
2511                "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
2512                accounts_hardlinks_dir.display(),
2513            ))
2514        })?;
2515        for entry in read_dir {
2516            let accounts_hardlink_dir = entry?.path();
2517            let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
2518                IoError::other(format!(
2519                    "{FN_ERR}: failed to read symlink '{}': {err}",
2520                    accounts_hardlink_dir.display(),
2521                ))
2522            })?;
2523            move_and_async_delete_path(&accounts_hardlink_dir);
2524        }
2525    }
2526    fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
2527        IoError::other(format!(
2528            "{FN_ERR}: failed to remove dir '{}': {err}",
2529            bank_snapshot_dir.as_ref().display(),
2530        ))
2531    })?;
2532    Ok(())
2533}
2534
2535pub fn should_take_full_snapshot(
2536    block_height: Slot,
2537    full_snapshot_archive_interval_slots: Slot,
2538) -> bool {
2539    block_height % full_snapshot_archive_interval_slots == 0
2540}
2541
2542pub fn should_take_incremental_snapshot(
2543    block_height: Slot,
2544    incremental_snapshot_archive_interval_slots: Slot,
2545    latest_full_snapshot_slot: Option<Slot>,
2546) -> bool {
2547    block_height % incremental_snapshot_archive_interval_slots == 0
2548        && latest_full_snapshot_slot.is_some()
2549}
2550
2551/// Creates an "accounts path" directory for tests
2552///
2553/// This temporary directory will contain the "run" and "snapshot"
2554/// sub-directories required by a validator.
2555#[cfg(feature = "dev-context-only-utils")]
2556pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
2557    let tmp_dir = tempfile::TempDir::new().unwrap();
2558    let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
2559    (tmp_dir, account_dir)
2560}
2561
2562#[cfg(test)]
2563mod tests {
2564    use {
2565        super::*,
2566        assert_matches::assert_matches,
2567        bincode::{deserialize_from, serialize_into},
2568        std::{convert::TryFrom, mem::size_of},
2569        tempfile::NamedTempFile,
2570    };
2571
2572    #[test]
2573    fn test_serialize_snapshot_data_file_under_limit() {
2574        let temp_dir = tempfile::TempDir::new().unwrap();
2575        let expected_consumed_size = size_of::<u32>() as u64;
2576        let consumed_size = serialize_snapshot_data_file_capped(
2577            &temp_dir.path().join("data-file"),
2578            expected_consumed_size,
2579            |stream| {
2580                serialize_into(stream, &2323_u32)?;
2581                Ok(())
2582            },
2583        )
2584        .unwrap();
2585        assert_eq!(consumed_size, expected_consumed_size);
2586    }
2587
2588    #[test]
2589    fn test_serialize_snapshot_data_file_over_limit() {
2590        let temp_dir = tempfile::TempDir::new().unwrap();
2591        let expected_consumed_size = size_of::<u32>() as u64;
2592        let result = serialize_snapshot_data_file_capped(
2593            &temp_dir.path().join("data-file"),
2594            expected_consumed_size - 1,
2595            |stream| {
2596                serialize_into(stream, &2323_u32)?;
2597                Ok(())
2598            },
2599        );
2600        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
2601    }
2602
2603    #[test]
2604    fn test_deserialize_snapshot_data_file_under_limit() {
2605        let expected_data = 2323_u32;
2606        let expected_consumed_size = size_of::<u32>() as u64;
2607
2608        let temp_dir = tempfile::TempDir::new().unwrap();
2609        serialize_snapshot_data_file_capped(
2610            &temp_dir.path().join("data-file"),
2611            expected_consumed_size,
2612            |stream| {
2613                serialize_into(stream, &expected_data)?;
2614                Ok(())
2615            },
2616        )
2617        .unwrap();
2618
2619        let snapshot_root_paths = SnapshotRootPaths {
2620            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2621            incremental_snapshot_root_file_path: None,
2622        };
2623
2624        let actual_data = deserialize_snapshot_data_files_capped(
2625            &snapshot_root_paths,
2626            expected_consumed_size,
2627            |stream| {
2628                Ok(deserialize_from::<_, u32>(
2629                    &mut stream.full_snapshot_stream,
2630                )?)
2631            },
2632        )
2633        .unwrap();
2634        assert_eq!(actual_data, expected_data);
2635    }
2636
2637    #[test]
2638    fn test_deserialize_snapshot_data_file_over_limit() {
2639        let expected_data = 2323_u32;
2640        let expected_consumed_size = size_of::<u32>() as u64;
2641
2642        let temp_dir = tempfile::TempDir::new().unwrap();
2643        serialize_snapshot_data_file_capped(
2644            &temp_dir.path().join("data-file"),
2645            expected_consumed_size,
2646            |stream| {
2647                serialize_into(stream, &expected_data)?;
2648                Ok(())
2649            },
2650        )
2651        .unwrap();
2652
2653        let snapshot_root_paths = SnapshotRootPaths {
2654            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2655            incremental_snapshot_root_file_path: None,
2656        };
2657
2658        let result = deserialize_snapshot_data_files_capped(
2659            &snapshot_root_paths,
2660            expected_consumed_size - 1,
2661            |stream| {
2662                Ok(deserialize_from::<_, u32>(
2663                    &mut stream.full_snapshot_stream,
2664                )?)
2665            },
2666        );
2667        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
2668    }
2669
2670    #[test]
2671    fn test_deserialize_snapshot_data_file_extra_data() {
2672        let expected_data = 2323_u32;
2673        let expected_consumed_size = size_of::<u32>() as u64;
2674
2675        let temp_dir = tempfile::TempDir::new().unwrap();
2676        serialize_snapshot_data_file_capped(
2677            &temp_dir.path().join("data-file"),
2678            expected_consumed_size * 2,
2679            |stream| {
2680                serialize_into(stream.by_ref(), &expected_data)?;
2681                serialize_into(stream.by_ref(), &expected_data)?;
2682                Ok(())
2683            },
2684        )
2685        .unwrap();
2686
2687        let snapshot_root_paths = SnapshotRootPaths {
2688            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2689            incremental_snapshot_root_file_path: None,
2690        };
2691
2692        let result = deserialize_snapshot_data_files_capped(
2693            &snapshot_root_paths,
2694            expected_consumed_size * 2,
2695            |stream| {
2696                Ok(deserialize_from::<_, u32>(
2697                    &mut stream.full_snapshot_stream,
2698                )?)
2699            },
2700        );
2701        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2702    }
2703
2704    #[test]
2705    fn test_snapshot_version_from_file_under_limit() {
2706        let file_content = SnapshotVersion::default().as_str();
2707        let mut file = NamedTempFile::new().unwrap();
2708        file.write_all(file_content.as_bytes()).unwrap();
2709        let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2710        assert_eq!(version_from_file, file_content);
2711    }
2712
2713    #[test]
2714    fn test_snapshot_version_from_file_over_limit() {
2715        let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2716        let file_content = vec![7u8; over_limit_size];
2717        let mut file = NamedTempFile::new().unwrap();
2718        file.write_all(&file_content).unwrap();
2719        assert_matches!(
2720            snapshot_version_from_file(file.path()),
2721            Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("snapshot version file too large")
2722        );
2723    }
2724
2725    #[test]
2726    fn test_parse_full_snapshot_archive_filename() {
2727        assert_eq!(
2728            parse_full_snapshot_archive_filename(&format!(
2729                "snapshot-42-{}.tar.bz2",
2730                Hash::default()
2731            ))
2732            .unwrap(),
2733            (42, SnapshotHash(Hash::default()), ArchiveFormat::TarBzip2)
2734        );
2735        assert_eq!(
2736            parse_full_snapshot_archive_filename(&format!(
2737                "snapshot-43-{}.tar.zst",
2738                Hash::default()
2739            ))
2740            .unwrap(),
2741            (
2742                43,
2743                SnapshotHash(Hash::default()),
2744                ArchiveFormat::TarZstd {
2745                    config: ZstdConfig::default(),
2746                }
2747            )
2748        );
2749        assert_eq!(
2750            parse_full_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default()))
2751                .unwrap(),
2752            (44, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2753        );
2754        assert_eq!(
2755            parse_full_snapshot_archive_filename(&format!(
2756                "snapshot-45-{}.tar.lz4",
2757                Hash::default()
2758            ))
2759            .unwrap(),
2760            (45, SnapshotHash(Hash::default()), ArchiveFormat::TarLz4)
2761        );
2762
2763        assert!(parse_full_snapshot_archive_filename("invalid").is_err());
2764        assert!(
2765            parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err()
2766        );
2767
2768        assert!(
2769            parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err()
2770        );
2771        assert!(parse_full_snapshot_archive_filename(&format!(
2772            "snapshot-12345678-{}.bad!ext",
2773            Hash::new_unique()
2774        ))
2775        .is_err());
2776        assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2777
2778        assert!(parse_full_snapshot_archive_filename(&format!(
2779            "snapshot-bad!slot-{}.bad!ext",
2780            Hash::new_unique()
2781        ))
2782        .is_err());
2783        assert!(parse_full_snapshot_archive_filename(&format!(
2784            "snapshot-12345678-{}.bad!ext",
2785            Hash::new_unique()
2786        ))
2787        .is_err());
2788        assert!(parse_full_snapshot_archive_filename(&format!(
2789            "snapshot-bad!slot-{}.tar",
2790            Hash::new_unique()
2791        ))
2792        .is_err());
2793
2794        assert!(parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_err());
2795        assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2796        assert!(parse_full_snapshot_archive_filename(&format!(
2797            "snapshot-bad!slot-{}.tar",
2798            Hash::new_unique()
2799        ))
2800        .is_err());
2801    }
2802
2803    #[test]
2804    fn test_parse_incremental_snapshot_archive_filename() {
2805        assert_eq!(
2806            parse_incremental_snapshot_archive_filename(&format!(
2807                "incremental-snapshot-42-123-{}.tar.bz2",
2808                Hash::default()
2809            ))
2810            .unwrap(),
2811            (
2812                42,
2813                123,
2814                SnapshotHash(Hash::default()),
2815                ArchiveFormat::TarBzip2
2816            )
2817        );
2818        assert_eq!(
2819            parse_incremental_snapshot_archive_filename(&format!(
2820                "incremental-snapshot-43-234-{}.tar.zst",
2821                Hash::default()
2822            ))
2823            .unwrap(),
2824            (
2825                43,
2826                234,
2827                SnapshotHash(Hash::default()),
2828                ArchiveFormat::TarZstd {
2829                    config: ZstdConfig::default(),
2830                }
2831            )
2832        );
2833        assert_eq!(
2834            parse_incremental_snapshot_archive_filename(&format!(
2835                "incremental-snapshot-44-345-{}.tar",
2836                Hash::default()
2837            ))
2838            .unwrap(),
2839            (44, 345, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2840        );
2841        assert_eq!(
2842            parse_incremental_snapshot_archive_filename(&format!(
2843                "incremental-snapshot-45-456-{}.tar.lz4",
2844                Hash::default()
2845            ))
2846            .unwrap(),
2847            (
2848                45,
2849                456,
2850                SnapshotHash(Hash::default()),
2851                ArchiveFormat::TarLz4
2852            )
2853        );
2854
2855        assert!(parse_incremental_snapshot_archive_filename("invalid").is_err());
2856        assert!(parse_incremental_snapshot_archive_filename(&format!(
2857            "snapshot-42-{}.tar",
2858            Hash::new_unique()
2859        ))
2860        .is_err());
2861        assert!(parse_incremental_snapshot_archive_filename(
2862            "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext"
2863        )
2864        .is_err());
2865
2866        assert!(parse_incremental_snapshot_archive_filename(&format!(
2867            "incremental-snapshot-bad!slot-56785678-{}.tar",
2868            Hash::new_unique()
2869        ))
2870        .is_err());
2871
2872        assert!(parse_incremental_snapshot_archive_filename(&format!(
2873            "incremental-snapshot-12345678-bad!slot-{}.tar",
2874            Hash::new_unique()
2875        ))
2876        .is_err());
2877
2878        assert!(parse_incremental_snapshot_archive_filename(
2879            "incremental-snapshot-12341234-56785678-bad!HASH.tar"
2880        )
2881        .is_err());
2882
2883        assert!(parse_incremental_snapshot_archive_filename(&format!(
2884            "incremental-snapshot-12341234-56785678-{}.bad!ext",
2885            Hash::new_unique()
2886        ))
2887        .is_err());
2888    }
2889
2890    #[test]
2891    fn test_check_are_snapshots_compatible() {
2892        let slot1: Slot = 1234;
2893        let slot2: Slot = 5678;
2894        let slot3: Slot = 999_999;
2895
2896        let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
2897            format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()),
2898        ))
2899        .unwrap();
2900
2901        assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
2902
2903        let incremental_snapshot_archive_info =
2904            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2905                "/dir/incremental-snapshot-{}-{}-{}.tar",
2906                slot1,
2907                slot2,
2908                Hash::new_unique()
2909            )))
2910            .unwrap();
2911
2912        assert!(check_are_snapshots_compatible(
2913            &full_snapshot_archive_info,
2914            Some(&incremental_snapshot_archive_info)
2915        )
2916        .is_ok());
2917
2918        let incremental_snapshot_archive_info =
2919            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2920                "/dir/incremental-snapshot-{}-{}-{}.tar",
2921                slot2,
2922                slot3,
2923                Hash::new_unique()
2924            )))
2925            .unwrap();
2926
2927        assert!(check_are_snapshots_compatible(
2928            &full_snapshot_archive_info,
2929            Some(&incremental_snapshot_archive_info)
2930        )
2931        .is_err());
2932    }
2933
2934    /// A test heler function that creates bank snapshot files
2935    fn common_create_bank_snapshot_files(
2936        bank_snapshots_dir: &Path,
2937        min_slot: Slot,
2938        max_slot: Slot,
2939    ) {
2940        for slot in min_slot..max_slot {
2941            let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
2942            fs::create_dir_all(&snapshot_dir).unwrap();
2943
2944            let snapshot_filename = get_snapshot_file_name(slot);
2945            let snapshot_path = snapshot_dir.join(snapshot_filename);
2946            fs::File::create(snapshot_path).unwrap();
2947
2948            let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2949            fs::File::create(status_cache_file).unwrap();
2950
2951            let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
2952            fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
2953
2954            // Mark this directory complete so it can be used.  Check this flag first before selecting for deserialization.
2955            let state_complete_path = snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2956            fs::File::create(state_complete_path).unwrap();
2957        }
2958    }
2959
2960    #[test]
2961    fn test_get_bank_snapshots() {
2962        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2963        let min_slot = 10;
2964        let max_slot = 20;
2965        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2966
2967        let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
2968        assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
2969    }
2970
2971    #[test]
2972    fn test_get_highest_bank_snapshot_post() {
2973        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2974        let min_slot = 99;
2975        let max_slot = 123;
2976        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2977
2978        let highest_bank_snapshot = get_highest_bank_snapshot_post(temp_snapshots_dir.path());
2979        assert!(highest_bank_snapshot.is_some());
2980        assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
2981    }
2982
2983    /// A test helper function that creates full and incremental snapshot archive files.  Creates
2984    /// full snapshot files in the range (`min_full_snapshot_slot`, `max_full_snapshot_slot`], and
2985    /// incremental snapshot files in the range (`min_incremental_snapshot_slot`,
2986    /// `max_incremental_snapshot_slot`].  Additionally, "bad" files are created for both full and
2987    /// incremental snapshots to ensure the tests properly filter them out.
2988    fn common_create_snapshot_archive_files(
2989        full_snapshot_archives_dir: &Path,
2990        incremental_snapshot_archives_dir: &Path,
2991        min_full_snapshot_slot: Slot,
2992        max_full_snapshot_slot: Slot,
2993        min_incremental_snapshot_slot: Slot,
2994        max_incremental_snapshot_slot: Slot,
2995    ) {
2996        fs::create_dir_all(full_snapshot_archives_dir).unwrap();
2997        fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
2998        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
2999            for incremental_snapshot_slot in
3000                min_incremental_snapshot_slot..max_incremental_snapshot_slot
3001            {
3002                let snapshot_filename = format!(
3003                    "incremental-snapshot-{}-{}-{}.tar",
3004                    full_snapshot_slot,
3005                    incremental_snapshot_slot,
3006                    Hash::default()
3007                );
3008                let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
3009                fs::File::create(snapshot_filepath).unwrap();
3010            }
3011
3012            let snapshot_filename =
3013                format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3014            let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
3015            fs::File::create(snapshot_filepath).unwrap();
3016
3017            // Add in an incremental snapshot with a bad filename and high slot to ensure filename are filtered and sorted correctly
3018            let bad_filename = format!(
3019                "incremental-snapshot-{}-{}-bad!hash.tar",
3020                full_snapshot_slot,
3021                max_incremental_snapshot_slot + 1,
3022            );
3023            let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
3024            fs::File::create(bad_filepath).unwrap();
3025        }
3026
3027        // Add in a snapshot with a bad filename and high slot to ensure filename are filtered and
3028        // sorted correctly
3029        let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1);
3030        let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
3031        fs::File::create(bad_filepath).unwrap();
3032    }
3033
3034    #[test]
3035    fn test_get_full_snapshot_archives() {
3036        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3037        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3038        let min_slot = 123;
3039        let max_slot = 456;
3040        common_create_snapshot_archive_files(
3041            full_snapshot_archives_dir.path(),
3042            incremental_snapshot_archives_dir.path(),
3043            min_slot,
3044            max_slot,
3045            0,
3046            0,
3047        );
3048
3049        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3050        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3051    }
3052
3053    #[test]
3054    fn test_get_full_snapshot_archives_remote() {
3055        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3056        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3057        let min_slot = 123;
3058        let max_slot = 456;
3059        common_create_snapshot_archive_files(
3060            &full_snapshot_archives_dir
3061                .path()
3062                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3063            &incremental_snapshot_archives_dir
3064                .path()
3065                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3066            min_slot,
3067            max_slot,
3068            0,
3069            0,
3070        );
3071
3072        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3073        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3074        assert!(snapshot_archives.iter().all(|info| info.is_remote()));
3075    }
3076
3077    #[test]
3078    fn test_get_incremental_snapshot_archives() {
3079        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3080        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3081        let min_full_snapshot_slot = 12;
3082        let max_full_snapshot_slot = 23;
3083        let min_incremental_snapshot_slot = 34;
3084        let max_incremental_snapshot_slot = 45;
3085        common_create_snapshot_archive_files(
3086            full_snapshot_archives_dir.path(),
3087            incremental_snapshot_archives_dir.path(),
3088            min_full_snapshot_slot,
3089            max_full_snapshot_slot,
3090            min_incremental_snapshot_slot,
3091            max_incremental_snapshot_slot,
3092        );
3093
3094        let incremental_snapshot_archives =
3095            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3096        assert_eq!(
3097            incremental_snapshot_archives.len() as Slot,
3098            (max_full_snapshot_slot - min_full_snapshot_slot)
3099                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3100        );
3101    }
3102
3103    #[test]
3104    fn test_get_incremental_snapshot_archives_remote() {
3105        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3106        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3107        let min_full_snapshot_slot = 12;
3108        let max_full_snapshot_slot = 23;
3109        let min_incremental_snapshot_slot = 34;
3110        let max_incremental_snapshot_slot = 45;
3111        common_create_snapshot_archive_files(
3112            &full_snapshot_archives_dir
3113                .path()
3114                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3115            &incremental_snapshot_archives_dir
3116                .path()
3117                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3118            min_full_snapshot_slot,
3119            max_full_snapshot_slot,
3120            min_incremental_snapshot_slot,
3121            max_incremental_snapshot_slot,
3122        );
3123
3124        let incremental_snapshot_archives =
3125            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3126        assert_eq!(
3127            incremental_snapshot_archives.len() as Slot,
3128            (max_full_snapshot_slot - min_full_snapshot_slot)
3129                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3130        );
3131        assert!(incremental_snapshot_archives
3132            .iter()
3133            .all(|info| info.is_remote()));
3134    }
3135
3136    #[test]
3137    fn test_get_highest_full_snapshot_archive_slot() {
3138        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3139        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3140        let min_slot = 123;
3141        let max_slot = 456;
3142        common_create_snapshot_archive_files(
3143            full_snapshot_archives_dir.path(),
3144            incremental_snapshot_archives_dir.path(),
3145            min_slot,
3146            max_slot,
3147            0,
3148            0,
3149        );
3150
3151        assert_eq!(
3152            get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
3153            Some(max_slot - 1)
3154        );
3155    }
3156
3157    #[test]
3158    fn test_get_highest_incremental_snapshot_slot() {
3159        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3160        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3161        let min_full_snapshot_slot = 12;
3162        let max_full_snapshot_slot = 23;
3163        let min_incremental_snapshot_slot = 34;
3164        let max_incremental_snapshot_slot = 45;
3165        common_create_snapshot_archive_files(
3166            full_snapshot_archives_dir.path(),
3167            incremental_snapshot_archives_dir.path(),
3168            min_full_snapshot_slot,
3169            max_full_snapshot_slot,
3170            min_incremental_snapshot_slot,
3171            max_incremental_snapshot_slot,
3172        );
3173
3174        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3175            assert_eq!(
3176                get_highest_incremental_snapshot_archive_slot(
3177                    incremental_snapshot_archives_dir.path(),
3178                    full_snapshot_slot
3179                ),
3180                Some(max_incremental_snapshot_slot - 1)
3181            );
3182        }
3183
3184        assert_eq!(
3185            get_highest_incremental_snapshot_archive_slot(
3186                incremental_snapshot_archives_dir.path(),
3187                max_full_snapshot_slot
3188            ),
3189            None
3190        );
3191    }
3192
3193    fn common_test_purge_old_snapshot_archives(
3194        snapshot_names: &[&String],
3195        maximum_full_snapshot_archives_to_retain: NonZeroUsize,
3196        maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
3197        expected_snapshots: &[&String],
3198    ) {
3199        let temp_snap_dir = tempfile::TempDir::new().unwrap();
3200
3201        for snap_name in snapshot_names {
3202            let snap_path = temp_snap_dir.path().join(snap_name);
3203            let mut _snap_file = fs::File::create(snap_path);
3204        }
3205        purge_old_snapshot_archives(
3206            temp_snap_dir.path(),
3207            temp_snap_dir.path(),
3208            maximum_full_snapshot_archives_to_retain,
3209            maximum_incremental_snapshot_archives_to_retain,
3210        );
3211
3212        let mut retained_snaps = HashSet::new();
3213        for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
3214            let entry_path_buf = entry.unwrap().path();
3215            let entry_path = entry_path_buf.as_path();
3216            let snapshot_name = entry_path
3217                .file_name()
3218                .unwrap()
3219                .to_str()
3220                .unwrap()
3221                .to_string();
3222            retained_snaps.insert(snapshot_name);
3223        }
3224
3225        for snap_name in expected_snapshots {
3226            assert!(
3227                retained_snaps.contains(snap_name.as_str()),
3228                "{snap_name} not found"
3229            );
3230        }
3231        assert_eq!(retained_snaps.len(), expected_snapshots.len());
3232    }
3233
3234    #[test]
3235    fn test_purge_old_full_snapshot_archives() {
3236        let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
3237        let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
3238        let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
3239        let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
3240
3241        // expecting only the newest to be retained
3242        let expected_snapshots = vec![&snap3_name];
3243        common_test_purge_old_snapshot_archives(
3244            &snapshot_names,
3245            NonZeroUsize::new(1).unwrap(),
3246            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3247            &expected_snapshots,
3248        );
3249
3250        // retaining 2, expecting the 2 newest to be retained
3251        let expected_snapshots = vec![&snap2_name, &snap3_name];
3252        common_test_purge_old_snapshot_archives(
3253            &snapshot_names,
3254            NonZeroUsize::new(2).unwrap(),
3255            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3256            &expected_snapshots,
3257        );
3258
3259        // retaining 3, all three should be retained
3260        let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
3261        common_test_purge_old_snapshot_archives(
3262            &snapshot_names,
3263            NonZeroUsize::new(3).unwrap(),
3264            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3265            &expected_snapshots,
3266        );
3267    }
3268
3269    /// Mimic a running node's behavior w.r.t. purging old snapshot archives.  Take snapshots in a
3270    /// loop, and periodically purge old snapshot archives.  After purging, check to make sure the
3271    /// snapshot archives on disk are correct.
3272    #[test]
3273    fn test_purge_old_full_snapshot_archives_in_the_loop() {
3274        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3275        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3276        let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
3277        let starting_slot: Slot = 42;
3278
3279        for slot in (starting_slot..).take(100) {
3280            let full_snapshot_archive_file_name =
3281                format!("snapshot-{}-{}.tar", slot, Hash::default());
3282            let full_snapshot_archive_path = full_snapshot_archives_dir
3283                .as_ref()
3284                .join(full_snapshot_archive_file_name);
3285            fs::File::create(full_snapshot_archive_path).unwrap();
3286
3287            // don't purge-and-check until enough snapshot archives have been created
3288            if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
3289                continue;
3290            }
3291
3292            // purge infrequently, so there will always be snapshot archives to purge
3293            if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
3294                continue;
3295            }
3296
3297            purge_old_snapshot_archives(
3298                &full_snapshot_archives_dir,
3299                &incremental_snapshot_archives_dir,
3300                maximum_snapshots_to_retain,
3301                NonZeroUsize::new(usize::MAX).unwrap(),
3302            );
3303            let mut full_snapshot_archives =
3304                get_full_snapshot_archives(&full_snapshot_archives_dir);
3305            full_snapshot_archives.sort_unstable();
3306            assert_eq!(
3307                full_snapshot_archives.len(),
3308                maximum_snapshots_to_retain.get()
3309            );
3310            assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
3311            for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
3312                assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
3313            }
3314        }
3315    }
3316
3317    #[test]
3318    fn test_purge_old_incremental_snapshot_archives() {
3319        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3320        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3321        let starting_slot = 100_000;
3322
3323        let maximum_incremental_snapshot_archives_to_retain =
3324            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3325        let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3326
3327        let incremental_snapshot_interval = 100;
3328        let num_incremental_snapshots_per_full_snapshot =
3329            maximum_incremental_snapshot_archives_to_retain.get() * 2;
3330        let full_snapshot_interval =
3331            incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
3332
3333        let mut snapshot_filenames = vec![];
3334        (starting_slot..)
3335            .step_by(full_snapshot_interval)
3336            .take(
3337                maximum_full_snapshot_archives_to_retain
3338                    .checked_mul(NonZeroUsize::new(2).unwrap())
3339                    .unwrap()
3340                    .get(),
3341            )
3342            .for_each(|full_snapshot_slot| {
3343                let snapshot_filename =
3344                    format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3345                let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
3346                fs::File::create(snapshot_path).unwrap();
3347                snapshot_filenames.push(snapshot_filename);
3348
3349                (full_snapshot_slot..)
3350                    .step_by(incremental_snapshot_interval)
3351                    .take(num_incremental_snapshots_per_full_snapshot)
3352                    .skip(1)
3353                    .for_each(|incremental_snapshot_slot| {
3354                        let snapshot_filename = format!(
3355                            "incremental-snapshot-{}-{}-{}.tar",
3356                            full_snapshot_slot,
3357                            incremental_snapshot_slot,
3358                            Hash::default()
3359                        );
3360                        let snapshot_path = incremental_snapshot_archives_dir
3361                            .path()
3362                            .join(&snapshot_filename);
3363                        fs::File::create(snapshot_path).unwrap();
3364                        snapshot_filenames.push(snapshot_filename);
3365                    });
3366            });
3367
3368        purge_old_snapshot_archives(
3369            full_snapshot_archives_dir.path(),
3370            incremental_snapshot_archives_dir.path(),
3371            maximum_full_snapshot_archives_to_retain,
3372            maximum_incremental_snapshot_archives_to_retain,
3373        );
3374
3375        // Ensure correct number of full snapshot archives are purged/retained
3376        let mut remaining_full_snapshot_archives =
3377            get_full_snapshot_archives(full_snapshot_archives_dir.path());
3378        assert_eq!(
3379            remaining_full_snapshot_archives.len(),
3380            maximum_full_snapshot_archives_to_retain.get(),
3381        );
3382        remaining_full_snapshot_archives.sort_unstable();
3383        let latest_full_snapshot_archive_slot =
3384            remaining_full_snapshot_archives.last().unwrap().slot();
3385
3386        // Ensure correct number of incremental snapshot archives are purged/retained
3387        // For each additional full snapshot archive, one additional (the newest)
3388        // incremental snapshot archive is retained. This is accounted for by the
3389        // `+ maximum_full_snapshot_archives_to_retain.saturating_sub(1)`
3390        let mut remaining_incremental_snapshot_archives =
3391            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3392        assert_eq!(
3393            remaining_incremental_snapshot_archives.len(),
3394            maximum_incremental_snapshot_archives_to_retain
3395                .get()
3396                .saturating_add(
3397                    maximum_full_snapshot_archives_to_retain
3398                        .get()
3399                        .saturating_sub(1)
3400                )
3401        );
3402        remaining_incremental_snapshot_archives.sort_unstable();
3403        remaining_incremental_snapshot_archives.reverse();
3404
3405        // Ensure there exists one incremental snapshot all but the latest full snapshot
3406        for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
3407            let incremental_snapshot_archive =
3408                remaining_incremental_snapshot_archives.pop().unwrap();
3409
3410            let expected_base_slot =
3411                latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
3412            assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
3413            let expected_slot = expected_base_slot
3414                + (full_snapshot_interval - incremental_snapshot_interval) as u64;
3415            assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
3416        }
3417
3418        // Ensure all remaining incremental snapshots are only for the latest full snapshot
3419        for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
3420            assert_eq!(
3421                incremental_snapshot_archive.base_slot(),
3422                latest_full_snapshot_archive_slot
3423            );
3424        }
3425
3426        // Ensure the remaining incremental snapshots are at the right slot
3427        let expected_remaining_incremental_snapshot_archive_slots =
3428            (latest_full_snapshot_archive_slot..)
3429                .step_by(incremental_snapshot_interval)
3430                .take(num_incremental_snapshots_per_full_snapshot)
3431                .skip(
3432                    num_incremental_snapshots_per_full_snapshot
3433                        - maximum_incremental_snapshot_archives_to_retain.get(),
3434                )
3435                .collect::<HashSet<_>>();
3436
3437        let actual_remaining_incremental_snapshot_archive_slots =
3438            remaining_incremental_snapshot_archives
3439                .iter()
3440                .map(|snapshot| snapshot.slot())
3441                .collect::<HashSet<_>>();
3442        assert_eq!(
3443            actual_remaining_incremental_snapshot_archive_slots,
3444            expected_remaining_incremental_snapshot_archive_slots
3445        );
3446    }
3447
3448    #[test]
3449    fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
3450        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3451        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3452
3453        for snapshot_filenames in [
3454            format!("incremental-snapshot-100-120-{}.tar", Hash::default()),
3455            format!("incremental-snapshot-100-140-{}.tar", Hash::default()),
3456            format!("incremental-snapshot-100-160-{}.tar", Hash::default()),
3457            format!("incremental-snapshot-100-180-{}.tar", Hash::default()),
3458            format!("incremental-snapshot-200-220-{}.tar", Hash::default()),
3459            format!("incremental-snapshot-200-240-{}.tar", Hash::default()),
3460            format!("incremental-snapshot-200-260-{}.tar", Hash::default()),
3461            format!("incremental-snapshot-200-280-{}.tar", Hash::default()),
3462        ] {
3463            let snapshot_path = incremental_snapshot_archives_dir
3464                .path()
3465                .join(snapshot_filenames);
3466            fs::File::create(snapshot_path).unwrap();
3467        }
3468
3469        purge_old_snapshot_archives(
3470            full_snapshot_archives_dir.path(),
3471            incremental_snapshot_archives_dir.path(),
3472            NonZeroUsize::new(usize::MAX).unwrap(),
3473            NonZeroUsize::new(usize::MAX).unwrap(),
3474        );
3475
3476        let remaining_incremental_snapshot_archives =
3477            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3478        assert!(remaining_incremental_snapshot_archives.is_empty());
3479    }
3480
3481    #[test]
3482    fn test_get_snapshot_accounts_hardlink_dir() {
3483        let slot: Slot = 1;
3484
3485        let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
3486
3487        let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
3488        let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
3489        let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
3490        fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
3491
3492        let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
3493        let appendvec_filename = format!("{slot}.0");
3494        let appendvec_path = accounts_dir.join(appendvec_filename);
3495
3496        let ret = get_snapshot_accounts_hardlink_dir(
3497            &appendvec_path,
3498            slot,
3499            &mut account_paths_set,
3500            &accounts_hardlinks_dir,
3501        );
3502        assert!(ret.is_ok());
3503
3504        let wrong_appendvec_path = appendvec_path
3505            .parent()
3506            .unwrap()
3507            .parent()
3508            .unwrap()
3509            .join(appendvec_path.file_name().unwrap());
3510        let ret = get_snapshot_accounts_hardlink_dir(
3511            &wrong_appendvec_path,
3512            slot,
3513            &mut account_paths_set,
3514            accounts_hardlinks_dir,
3515        );
3516
3517        assert_matches!(
3518            ret,
3519            Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
3520        );
3521    }
3522
3523    #[test]
3524    fn test_full_snapshot_slot_file_good() {
3525        let slot_written = 123_456_789;
3526        let bank_snapshot_dir = TempDir::new().unwrap();
3527        write_full_snapshot_slot_file(&bank_snapshot_dir, slot_written).unwrap();
3528
3529        let slot_read = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap();
3530        assert_eq!(slot_read, slot_written);
3531    }
3532
3533    #[test]
3534    fn test_full_snapshot_slot_file_bad() {
3535        const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
3536        let too_small = [1u8; SLOT_SIZE - 1];
3537        let too_large = [1u8; SLOT_SIZE + 1];
3538
3539        for contents in [too_small.as_slice(), too_large.as_slice()] {
3540            let bank_snapshot_dir = TempDir::new().unwrap();
3541            let full_snapshot_slot_path = bank_snapshot_dir
3542                .as_ref()
3543                .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
3544            fs::write(full_snapshot_slot_path, contents).unwrap();
3545
3546            let err = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap_err();
3547            assert!(err
3548                .to_string()
3549                .starts_with("invalid full snapshot slot file size"));
3550        }
3551    }
3552}