1use {
2 crate::{
3 bank::{BankFieldsToSerialize, BankHashStats, BankSlotDelta},
4 serde_snapshot::{
5 self, BankIncrementalSnapshotPersistence, ExtraFieldsToSerialize, SnapshotStreams,
6 },
7 snapshot_archive_info::{
8 FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
9 SnapshotArchiveInfoGetter,
10 },
11 snapshot_bank_utils,
12 snapshot_config::SnapshotConfig,
13 snapshot_hash::SnapshotHash,
14 snapshot_package::{SnapshotKind, SnapshotPackage},
15 snapshot_utils::snapshot_storage_rebuilder::{
16 RebuiltSnapshotStorage, SnapshotStorageRebuilder,
17 },
18 },
19 bzip2::bufread::BzDecoder,
20 crossbeam_channel::Sender,
21 flate2::read::GzDecoder,
22 lazy_static::lazy_static,
23 log::*,
24 regex::Regex,
25 solana_accounts_db::{
26 account_storage::{meta::StoredMetaWriteVersion, AccountStorageMap},
27 accounts_db::{AccountStorageEntry, AtomicAccountsFileId},
28 accounts_file::{AccountsFile, AccountsFileError, InternalsForArchive, StorageAccess},
29 accounts_hash::{AccountsDeltaHash, AccountsHash},
30 epoch_accounts_hash::EpochAccountsHash,
31 hardened_unpack::{self, ParallelSelector, UnpackError},
32 shared_buffer_reader::{SharedBuffer, SharedBufferReader},
33 utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
34 },
35 solana_measure::{measure::Measure, measure_time, measure_us},
36 solana_sdk::{
37 clock::{Epoch, Slot},
38 hash::Hash,
39 },
40 std::{
41 cmp::Ordering,
42 collections::{HashMap, HashSet},
43 fmt, fs,
44 io::{BufReader, BufWriter, Error as IoError, Read, Result as IoResult, Seek, Write},
45 mem,
46 num::NonZeroUsize,
47 ops::RangeInclusive,
48 path::{Path, PathBuf},
49 process::ExitStatus,
50 str::FromStr,
51 sync::Arc,
52 thread::{Builder, JoinHandle},
53 },
54 tar::{self, Archive},
55 tempfile::TempDir,
56 thiserror::Error,
57};
58#[cfg(feature = "dev-context-only-utils")]
59use {
60 hardened_unpack::UnpackedAppendVecMap, rayon::prelude::*,
61 solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
62};
63
64mod archive_format;
65pub mod snapshot_storage_rebuilder;
66pub use archive_format::*;
67
68pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
69pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
70pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
71pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
72pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
73pub const SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME: &str = "full_snapshot_slot";
74pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; const VERSION_STRING_V1_2_0: &str = "1.2.0";
77pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
78pub const BANK_SNAPSHOT_PRE_FILENAME_EXTENSION: &str = "pre";
79pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
84 unsafe { NonZeroUsize::new_unchecked(2) };
85pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
86 unsafe { NonZeroUsize::new_unchecked(4) };
87pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
88pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
89
90#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
91pub enum SnapshotVersion {
92 #[default]
93 V1_2_0,
94}
95
96impl fmt::Display for SnapshotVersion {
97 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
98 f.write_str(From::from(*self))
99 }
100}
101
102impl From<SnapshotVersion> for &'static str {
103 fn from(snapshot_version: SnapshotVersion) -> &'static str {
104 match snapshot_version {
105 SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
106 }
107 }
108}
109
110impl FromStr for SnapshotVersion {
111 type Err = &'static str;
112
113 fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
114 let version_string = if version_string
116 .get(..1)
117 .is_some_and(|s| s.eq_ignore_ascii_case("v"))
118 {
119 &version_string[1..]
120 } else {
121 version_string
122 };
123 match version_string {
124 VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
125 _ => Err("unsupported snapshot version"),
126 }
127 }
128}
129
130impl SnapshotVersion {
131 pub fn as_str(self) -> &'static str {
132 <&str as From<Self>>::from(self)
133 }
134}
135
136#[derive(PartialEq, Eq, Debug)]
139pub struct BankSnapshotInfo {
140 pub slot: Slot,
142 pub snapshot_kind: BankSnapshotKind,
144 pub snapshot_dir: PathBuf,
146 pub snapshot_version: SnapshotVersion,
148}
149
150impl PartialOrd for BankSnapshotInfo {
151 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
152 Some(self.cmp(other))
153 }
154}
155
156impl Ord for BankSnapshotInfo {
158 fn cmp(&self, other: &Self) -> Ordering {
159 self.slot.cmp(&other.slot)
160 }
161}
162
163impl BankSnapshotInfo {
164 pub fn new_from_dir(
165 bank_snapshots_dir: impl AsRef<Path>,
166 slot: Slot,
167 ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
168 let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
171
172 if !bank_snapshot_dir.is_dir() {
173 return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
174 bank_snapshot_dir,
175 ));
176 }
177
178 if !is_bank_snapshot_complete(&bank_snapshot_dir) {
184 return Err(SnapshotNewFromDirError::IncompleteDir(bank_snapshot_dir));
185 }
186
187 let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
188 if !status_cache_file.is_file() {
189 return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
190 status_cache_file,
191 ));
192 }
193
194 let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
195 let version_str = snapshot_version_from_file(&version_path).or(Err(
196 SnapshotNewFromDirError::MissingVersionFile(version_path),
197 ))?;
198 let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
199 .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
200
201 let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
202 let bank_snapshot_pre_path =
203 bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
204
205 let snapshot_kind = if bank_snapshot_pre_path.is_file() {
214 BankSnapshotKind::Pre
215 } else if bank_snapshot_post_path.is_file() {
216 BankSnapshotKind::Post
217 } else {
218 return Err(SnapshotNewFromDirError::MissingSnapshotFile(
219 bank_snapshot_dir,
220 ));
221 };
222
223 Ok(BankSnapshotInfo {
224 slot,
225 snapshot_kind,
226 snapshot_dir: bank_snapshot_dir,
227 snapshot_version,
228 })
229 }
230
231 pub fn snapshot_path(&self) -> PathBuf {
232 let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
233
234 let ext = match self.snapshot_kind {
235 BankSnapshotKind::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
236 BankSnapshotKind::Post => "",
237 };
238 bank_snapshot_path.set_extension(ext);
239
240 bank_snapshot_path
241 }
242}
243
244#[derive(Debug, Copy, Clone, Eq, PartialEq)]
253pub enum BankSnapshotKind {
254 Pre,
256 Post,
258}
259
260#[derive(Clone, Copy, Debug, Eq, PartialEq)]
264pub enum SnapshotFrom {
265 Archive,
267 Dir,
269}
270
271#[derive(Debug)]
274pub struct SnapshotRootPaths {
275 pub full_snapshot_root_file_path: PathBuf,
276 pub incremental_snapshot_root_file_path: Option<PathBuf>,
277}
278
279#[derive(Debug)]
281pub struct UnarchivedSnapshot {
282 #[allow(dead_code)]
283 unpack_dir: TempDir,
284 pub storage: AccountStorageMap,
285 pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
286 pub measure_untar: Measure,
287}
288
289#[derive(Debug)]
291pub struct UnpackedSnapshotsDirAndVersion {
292 pub unpacked_snapshots_dir: PathBuf,
293 pub snapshot_version: SnapshotVersion,
294}
295
296pub(crate) struct StorageAndNextAccountsFileId {
299 pub storage: AccountStorageMap,
300 pub next_append_vec_id: AtomicAccountsFileId,
301}
302
303#[derive(Error, Debug)]
304#[allow(clippy::large_enum_variant)]
305pub enum SnapshotError {
306 #[error("I/O error: {0}")]
307 Io(#[from] IoError),
308
309 #[error("AccountsFile error: {0}")]
310 AccountsFileError(#[from] AccountsFileError),
311
312 #[error("serialization error: {0}")]
313 Serialize(#[from] bincode::Error),
314
315 #[error("crossbeam send error: {0}")]
316 CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
317
318 #[error("archive generation failure {0}")]
319 ArchiveGenerationFailure(ExitStatus),
320
321 #[error("Unpack error: {0}")]
322 UnpackError(#[from] UnpackError),
323
324 #[error("source({1}) - I/O error: {0}")]
325 IoWithSource(IoError, &'static str),
326
327 #[error("could not get file name from path '{0}'")]
328 PathToFileNameError(PathBuf),
329
330 #[error("could not get str from file name '{0}'")]
331 FileNameToStrError(PathBuf),
332
333 #[error("could not parse snapshot archive's file name '{0}'")]
334 ParseSnapshotArchiveFileNameError(String),
335
336 #[error("snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot ({1}) do not match")]
337 MismatchedBaseSlot(Slot, Slot),
338
339 #[error("no snapshot archives to load from '{0}'")]
340 NoSnapshotArchives(PathBuf),
341
342 #[error("snapshot slot mismatch: deserialized bank: {0}, snapshot archive: {1}")]
343 MismatchedSlot(Slot, Slot),
344
345 #[error("snapshot hash mismatch: deserialized bank: {0:?}, snapshot archive: {1:?}")]
346 MismatchedHash(SnapshotHash, SnapshotHash),
347
348 #[error("snapshot slot deltas are invalid: {0}")]
349 VerifySlotDeltas(#[from] VerifySlotDeltasError),
350
351 #[error("snapshot epoch stakes are invalid: {0}")]
352 VerifyEpochStakes(#[from] VerifyEpochStakesError),
353
354 #[error("bank_snapshot_info new_from_dir failed: {0}")]
355 NewFromDir(#[from] SnapshotNewFromDirError),
356
357 #[error("invalid snapshot dir path '{0}'")]
358 InvalidSnapshotDirPath(PathBuf),
359
360 #[error("invalid AppendVec path '{0}'")]
361 InvalidAppendVecPath(PathBuf),
362
363 #[error("invalid account path '{0}'")]
364 InvalidAccountPath(PathBuf),
365
366 #[error("no valid snapshot dir found under '{0}'")]
367 NoSnapshotSlotDir(PathBuf),
368
369 #[error("snapshot dir account paths mismatching")]
370 AccountPathsMismatch,
371
372 #[error("failed to add bank snapshot for slot {1}: {0}")]
373 AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
374
375 #[error("failed to archive snapshot package: {0}")]
376 ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
377
378 #[error("failed to rebuild snapshot storages: {0}")]
379 RebuildStorages(String),
380}
381
382#[derive(Error, Debug)]
383pub enum SnapshotNewFromDirError {
384 #[error("invalid bank snapshot directory '{0}'")]
385 InvalidBankSnapshotDir(PathBuf),
386
387 #[error("missing status cache file '{0}'")]
388 MissingStatusCacheFile(PathBuf),
389
390 #[error("missing version file '{0}'")]
391 MissingVersionFile(PathBuf),
392
393 #[error("invalid snapshot version '{0}'")]
394 InvalidVersion(String),
395
396 #[error("snapshot directory incomplete '{0}'")]
397 IncompleteDir(PathBuf),
398
399 #[error("missing snapshot file '{0}'")]
400 MissingSnapshotFile(PathBuf),
401}
402
403pub type Result<T> = std::result::Result<T, SnapshotError>;
404
405#[derive(Error, Debug, PartialEq, Eq)]
407pub enum VerifySlotDeltasError {
408 #[error("too many entries: {0} (max: {1})")]
409 TooManyEntries(usize, usize),
410
411 #[error("slot {0} is not a root")]
412 SlotIsNotRoot(Slot),
413
414 #[error("slot {0} is greater than bank slot {1}")]
415 SlotGreaterThanMaxRoot(Slot, Slot),
416
417 #[error("slot {0} has multiple entries")]
418 SlotHasMultipleEntries(Slot),
419
420 #[error("slot {0} was not found in slot history")]
421 SlotNotFoundInHistory(Slot),
422
423 #[error("slot {0} was in history but missing from slot deltas")]
424 SlotNotFoundInDeltas(Slot),
425
426 #[error("slot history is bad and cannot be used to verify slot deltas")]
427 BadSlotHistory,
428}
429
430#[derive(Error, Debug, PartialEq, Eq)]
432pub enum VerifyEpochStakesError {
433 #[error("epoch {0} is greater than the max {1}")]
434 EpochGreaterThanMax(Epoch, Epoch),
435
436 #[error("stakes not found for epoch {0} (required epochs: {1:?})")]
437 StakesNotFound(Epoch, RangeInclusive<Epoch>),
438}
439
440#[derive(Error, Debug)]
442pub enum AddBankSnapshotError {
443 #[error("bank snapshot dir already exists '{0}'")]
444 SnapshotDirAlreadyExists(PathBuf),
445
446 #[error("failed to create snapshot dir '{1}': {0}")]
447 CreateSnapshotDir(#[source] IoError, PathBuf),
448
449 #[error("failed to flush storage '{1}': {0}")]
450 FlushStorage(#[source] AccountsFileError, PathBuf),
451
452 #[error("failed to hard link storages: {0}")]
453 HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
454
455 #[error("failed to serialize bank: {0}")]
456 SerializeBank(#[source] Box<SnapshotError>),
457
458 #[error("failed to serialize status cache: {0}")]
459 SerializeStatusCache(#[source] Box<SnapshotError>),
460
461 #[error("failed to write snapshot version file '{1}': {0}")]
462 WriteSnapshotVersionFile(#[source] IoError, PathBuf),
463
464 #[error("failed to mark snapshot as 'complete': failed to create file '{1}': {0}")]
465 CreateStateCompleteFile(#[source] IoError, PathBuf),
466}
467
468#[derive(Error, Debug)]
470pub enum ArchiveSnapshotPackageError {
471 #[error("failed to create archive path '{1}': {0}")]
472 CreateArchiveDir(#[source] IoError, PathBuf),
473
474 #[error("failed to create staging dir inside '{1}': {0}")]
475 CreateStagingDir(#[source] IoError, PathBuf),
476
477 #[error("failed to create snapshot staging dir '{1}': {0}")]
478 CreateSnapshotStagingDir(#[source] IoError, PathBuf),
479
480 #[error("failed to canonicalize snapshot source dir '{1}': {0}")]
481 CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
482
483 #[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
484 SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
485
486 #[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
487 SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
488
489 #[error("failed to symlink version file from '{1}' to '{2}': {0}")]
490 SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
491
492 #[error("failed to create archive file '{1}': {0}")]
493 CreateArchiveFile(#[source] IoError, PathBuf),
494
495 #[error("failed to archive version file: {0}")]
496 ArchiveVersionFile(#[source] IoError),
497
498 #[error("failed to archive snapshots dir: {0}")]
499 ArchiveSnapshotsDir(#[source] IoError),
500
501 #[error("failed to archive account storage file '{1}': {0}")]
502 ArchiveAccountStorageFile(#[source] IoError, PathBuf),
503
504 #[error("failed to archive snapshot: {0}")]
505 FinishArchive(#[source] IoError),
506
507 #[error("failed to create encoder: {0}")]
508 CreateEncoder(#[source] IoError),
509
510 #[error("failed to encode archive: {0}")]
511 FinishEncoder(#[source] IoError),
512
513 #[error("failed to query archive metadata '{1}': {0}")]
514 QueryArchiveMetadata(#[source] IoError, PathBuf),
515
516 #[error("failed to move archive from '{1}' to '{2}': {0}")]
517 MoveArchive(#[source] IoError, PathBuf, PathBuf),
518}
519
520#[derive(Error, Debug)]
522pub enum HardLinkStoragesToSnapshotError {
523 #[error("failed to create accounts hard links dir '{1}': {0}")]
524 CreateAccountsHardLinksDir(#[source] IoError, PathBuf),
525
526 #[error("failed to get the snapshot's accounts hard link dir: {0}")]
527 GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
528
529 #[error("failed to hard link storage from '{1}' to '{2}': {0}")]
530 HardLinkStorage(#[source] IoError, PathBuf, PathBuf),
531}
532
533#[derive(Error, Debug)]
535pub enum GetSnapshotAccountsHardLinkDirError {
536 #[error("invalid account storage path '{0}'")]
537 GetAccountPath(PathBuf),
538
539 #[error("failed to create the snapshot hard link dir '{1}': {0}")]
540 CreateSnapshotHardLinkDir(#[source] IoError, PathBuf),
541
542 #[error("failed to symlink snapshot hard link dir '{link}' to '{original}': {source}")]
543 SymlinkSnapshotHardLinkDir {
544 source: IoError,
545 original: PathBuf,
546 link: PathBuf,
547 },
548}
549
550pub fn clean_orphaned_account_snapshot_dirs(
558 bank_snapshots_dir: impl AsRef<Path>,
559 account_snapshot_paths: &[PathBuf],
560) -> IoResult<()> {
561 let mut account_snapshot_dirs_referenced = HashSet::new();
564 let snapshots = get_bank_snapshots(bank_snapshots_dir);
565 for snapshot in snapshots {
566 let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
567 let read_dir = fs::read_dir(&account_hardlinks_dir).map_err(|err| {
569 IoError::other(format!(
570 "failed to read account hardlinks dir '{}': {err}",
571 account_hardlinks_dir.display(),
572 ))
573 })?;
574 for entry in read_dir {
575 let path = entry?.path();
576 let target = fs::read_link(&path).map_err(|err| {
577 IoError::other(format!(
578 "failed to read symlink '{}': {err}",
579 path.display(),
580 ))
581 })?;
582 account_snapshot_dirs_referenced.insert(target);
583 }
584 }
585
586 for account_snapshot_path in account_snapshot_paths {
588 let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
589 IoError::other(format!(
590 "failed to read account snapshot dir '{}': {err}",
591 account_snapshot_path.display(),
592 ))
593 })?;
594 for entry in read_dir {
595 let path = entry?.path();
596 if !account_snapshot_dirs_referenced.contains(&path) {
597 info!(
598 "Removing orphaned account snapshot hardlink directory '{}'...",
599 path.display()
600 );
601 move_and_async_delete_path(&path);
602 }
603 }
604 }
605
606 Ok(())
607}
608
609pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
611 let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
612 return;
614 };
615
616 let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
617
618 let incomplete_dirs: Vec<_> = read_dir_iter
619 .filter_map(|entry| entry.ok())
620 .map(|entry| entry.path())
621 .filter(|path| path.is_dir())
622 .filter(is_incomplete)
623 .collect();
624
625 for incomplete_dir in incomplete_dirs {
627 let result = purge_bank_snapshot(&incomplete_dir);
628 match result {
629 Ok(_) => info!(
630 "Purged incomplete snapshot dir: {}",
631 incomplete_dir.display()
632 ),
633 Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
634 }
635 }
636}
637
638fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
640 let state_complete_path = bank_snapshot_dir
641 .as_ref()
642 .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
643 state_complete_path.is_file()
644}
645
646pub fn write_full_snapshot_slot_file(
648 bank_snapshot_dir: impl AsRef<Path>,
649 full_snapshot_slot: Slot,
650) -> IoResult<()> {
651 let full_snapshot_slot_path = bank_snapshot_dir
652 .as_ref()
653 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
654 fs::write(
655 &full_snapshot_slot_path,
656 Slot::to_le_bytes(full_snapshot_slot),
657 )
658 .map_err(|err| {
659 IoError::other(format!(
660 "failed to write full snapshot slot file '{}': {err}",
661 full_snapshot_slot_path.display(),
662 ))
663 })
664}
665
666pub fn read_full_snapshot_slot_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<Slot> {
668 const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
669 let full_snapshot_slot_path = bank_snapshot_dir
670 .as_ref()
671 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
672 let full_snapshot_slot_file_metadata = fs::metadata(&full_snapshot_slot_path)?;
673 if full_snapshot_slot_file_metadata.len() != SLOT_SIZE as u64 {
674 let error_message = format!(
675 "invalid full snapshot slot file size: '{}' has {} bytes (should be {} bytes)",
676 full_snapshot_slot_path.display(),
677 full_snapshot_slot_file_metadata.len(),
678 SLOT_SIZE,
679 );
680 return Err(IoError::other(error_message));
681 }
682 let mut full_snapshot_slot_file = fs::File::open(&full_snapshot_slot_path)?;
683 let mut buffer = [0; SLOT_SIZE];
684 full_snapshot_slot_file.read_exact(&mut buffer)?;
685 let slot = Slot::from_le_bytes(buffer);
686 Ok(slot)
687}
688
689pub fn get_highest_loadable_bank_snapshot(
696 snapshot_config: &SnapshotConfig,
697) -> Option<BankSnapshotInfo> {
698 let highest_bank_snapshot =
699 get_highest_bank_snapshot_post(&snapshot_config.bank_snapshots_dir)?;
700
701 if !snapshot_config.should_generate_snapshots() {
704 return Some(highest_bank_snapshot);
705 }
706
707 let highest_full_snapshot_archive_slot =
710 get_highest_full_snapshot_archive_slot(&snapshot_config.full_snapshot_archives_dir)?;
711 let full_snapshot_file_slot =
712 read_full_snapshot_slot_file(&highest_bank_snapshot.snapshot_dir).ok()?;
713 (full_snapshot_file_slot == highest_full_snapshot_archive_slot).then_some(highest_bank_snapshot)
714}
715
716pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
719 if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
720 for entry in entries.flatten() {
721 if entry
722 .file_name()
723 .to_str()
724 .map(|file_name| file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX))
725 .unwrap_or(false)
726 {
727 let path = entry.path();
728 let result = if path.is_dir() {
729 fs::remove_dir_all(&path)
730 } else {
731 fs::remove_file(&path)
732 };
733 if let Err(err) = result {
734 warn!(
735 "Failed to remove temporary snapshot archive '{}': {err}",
736 path.display(),
737 );
738 }
739 }
740 }
741 }
742}
743
744pub fn serialize_and_archive_snapshot_package(
746 snapshot_package: SnapshotPackage,
747 snapshot_config: &SnapshotConfig,
748) -> Result<SnapshotArchiveInfo> {
749 let SnapshotPackage {
750 snapshot_kind,
751 slot: snapshot_slot,
752 block_height,
753 hash: snapshot_hash,
754 mut snapshot_storages,
755 status_cache_slot_deltas,
756 bank_fields_to_serialize,
757 bank_hash_stats,
758 accounts_delta_hash,
759 accounts_hash,
760 epoch_accounts_hash,
761 bank_incremental_snapshot_persistence,
762 write_version,
763 enqueued: _,
764 } = snapshot_package;
765
766 let bank_snapshot_info = serialize_snapshot(
767 &snapshot_config.bank_snapshots_dir,
768 snapshot_config.snapshot_version,
769 snapshot_storages.as_slice(),
770 status_cache_slot_deltas.as_slice(),
771 bank_fields_to_serialize,
772 bank_hash_stats,
773 accounts_delta_hash,
774 accounts_hash,
775 epoch_accounts_hash,
776 bank_incremental_snapshot_persistence.as_ref(),
777 write_version,
778 )?;
779
780 let full_snapshot_archive_slot = match snapshot_kind {
782 SnapshotKind::FullSnapshot => snapshot_slot,
783 SnapshotKind::IncrementalSnapshot(base_slot) => base_slot,
784 };
785 write_full_snapshot_slot_file(&bank_snapshot_info.snapshot_dir, full_snapshot_archive_slot)
786 .map_err(|err| {
787 IoError::other(format!(
788 "failed to serialize snapshot slot {snapshot_slot}, block height {block_height}, kind {snapshot_kind:?}: {err}",
789 ))
790 })?;
791
792 let snapshot_archive_path = match snapshot_package.snapshot_kind {
793 SnapshotKind::FullSnapshot => build_full_snapshot_archive_path(
794 &snapshot_config.full_snapshot_archives_dir,
795 snapshot_package.slot,
796 &snapshot_package.hash,
797 snapshot_config.archive_format,
798 ),
799 SnapshotKind::IncrementalSnapshot(incremental_snapshot_base_slot) => {
800 snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
803 build_incremental_snapshot_archive_path(
804 &snapshot_config.incremental_snapshot_archives_dir,
805 incremental_snapshot_base_slot,
806 snapshot_package.slot,
807 &snapshot_package.hash,
808 snapshot_config.archive_format,
809 )
810 }
811 };
812
813 let snapshot_archive_info = archive_snapshot(
814 snapshot_kind,
815 snapshot_slot,
816 snapshot_hash,
817 snapshot_storages.as_slice(),
818 &bank_snapshot_info.snapshot_dir,
819 snapshot_archive_path,
820 snapshot_config.archive_format,
821 )?;
822
823 Ok(snapshot_archive_info)
824}
825
826#[allow(clippy::too_many_arguments)]
828fn serialize_snapshot(
829 bank_snapshots_dir: impl AsRef<Path>,
830 snapshot_version: SnapshotVersion,
831 snapshot_storages: &[Arc<AccountStorageEntry>],
832 slot_deltas: &[BankSlotDelta],
833 mut bank_fields: BankFieldsToSerialize,
834 bank_hash_stats: BankHashStats,
835 accounts_delta_hash: AccountsDeltaHash,
836 accounts_hash: AccountsHash,
837 epoch_accounts_hash: Option<EpochAccountsHash>,
838 bank_incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
839 write_version: StoredMetaWriteVersion,
840) -> Result<BankSnapshotInfo> {
841 let slot = bank_fields.slot;
842
843 let do_serialize_snapshot = || {
846 let mut measure_everything = Measure::start("");
847 let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
848 if bank_snapshot_dir.exists() {
849 return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
850 bank_snapshot_dir,
851 ));
852 }
853 fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
854 AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
855 })?;
856
857 let bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
859 info!(
860 "Creating bank snapshot for slot {slot} at '{}'",
861 bank_snapshot_path.display(),
862 );
863
864 let (_, flush_storages_us) = measure_us!({
865 for storage in snapshot_storages {
866 storage.flush().map_err(|err| {
867 AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
868 })?;
869 }
870 });
871
872 let (_, hard_link_storages_us) = measure_us!(hard_link_storages_to_snapshot(
877 &bank_snapshot_dir,
878 slot,
879 snapshot_storages
880 )
881 .map_err(AddBankSnapshotError::HardLinkStorages)?);
882
883 let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
884 let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
885 let extra_fields = ExtraFieldsToSerialize {
886 lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
887 incremental_snapshot_persistence: bank_incremental_snapshot_persistence,
888 epoch_accounts_hash,
889 versioned_epoch_stakes,
890 accounts_lt_hash: bank_fields.accounts_lt_hash.clone().map(Into::into),
891 };
892 serde_snapshot::serialize_bank_snapshot_into(
893 stream,
894 bank_fields,
895 bank_hash_stats,
896 accounts_delta_hash,
897 accounts_hash,
898 &get_storages_to_serialize(snapshot_storages),
899 extra_fields,
900 write_version,
901 )?;
902 Ok(())
903 };
904 let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
905 serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
906 .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
907 "bank serialize"
908 );
909
910 let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
911 let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
912 snapshot_bank_utils::serialize_status_cache(slot_deltas, &status_cache_path)
913 .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
914 );
915
916 let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
917 let (_, write_version_file_us) = measure_us!(fs::write(
918 &version_path,
919 snapshot_version.as_str().as_bytes(),
920 )
921 .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
922
923 let state_complete_path = bank_snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
925 let (_, write_state_complete_file_us) = measure_us!(fs::File::create(&state_complete_path)
926 .map_err(|err| {
927 AddBankSnapshotError::CreateStateCompleteFile(err, state_complete_path)
928 })?);
929
930 measure_everything.stop();
931
932 datapoint_info!(
934 "snapshot_bank",
935 ("slot", slot, i64),
936 ("bank_size", bank_snapshot_consumed_size, i64),
937 ("status_cache_size", status_cache_consumed_size, i64),
938 ("flush_storages_us", flush_storages_us, i64),
939 ("hard_link_storages_us", hard_link_storages_us, i64),
940 ("bank_serialize_us", bank_serialize.as_us(), i64),
941 ("status_cache_serialize_us", status_cache_serialize_us, i64),
942 ("write_version_file_us", write_version_file_us, i64),
943 (
944 "write_state_complete_file_us",
945 write_state_complete_file_us,
946 i64
947 ),
948 ("total_us", measure_everything.as_us(), i64),
949 );
950
951 info!(
952 "{} for slot {} at {}",
953 bank_serialize,
954 slot,
955 bank_snapshot_path.display(),
956 );
957
958 Ok(BankSnapshotInfo {
959 slot,
960 snapshot_kind: BankSnapshotKind::Pre,
961 snapshot_dir: bank_snapshot_dir,
962 snapshot_version,
963 })
964 };
965
966 do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
967}
968
969fn archive_snapshot(
971 snapshot_kind: SnapshotKind,
972 snapshot_slot: Slot,
973 snapshot_hash: SnapshotHash,
974 snapshot_storages: &[Arc<AccountStorageEntry>],
975 bank_snapshot_dir: impl AsRef<Path>,
976 archive_path: impl AsRef<Path>,
977 archive_format: ArchiveFormat,
978) -> Result<SnapshotArchiveInfo> {
979 use ArchiveSnapshotPackageError as E;
980 const SNAPSHOTS_DIR: &str = "snapshots";
981 const ACCOUNTS_DIR: &str = "accounts";
982 info!("Generating snapshot archive for slot {snapshot_slot}, kind: {snapshot_kind:?}");
983
984 let mut timer = Measure::start("snapshot_package-package_snapshots");
985 let tar_dir = archive_path
986 .as_ref()
987 .parent()
988 .expect("Tar output path is invalid");
989
990 fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
991
992 let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
994 let staging_dir = tempfile::Builder::new()
995 .prefix(&format!("{}{}-", staging_dir_prefix, snapshot_slot))
996 .tempdir_in(tar_dir)
997 .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
998 let staging_snapshots_dir = staging_dir.path().join(SNAPSHOTS_DIR);
999
1000 let slot_str = snapshot_slot.to_string();
1001 let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
1002 fs::create_dir_all(&staging_snapshot_dir)
1004 .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
1005
1006 let src_snapshot_dir = bank_snapshot_dir.as_ref().canonicalize().map_err(|err| {
1008 E::CanonicalizeSnapshotSourceDir(err, bank_snapshot_dir.as_ref().to_path_buf())
1009 })?;
1010 let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
1011 let src_snapshot_file = src_snapshot_dir.join(slot_str);
1012 symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
1013 .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
1014
1015 let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1018 let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1019 symlink::symlink_file(&src_status_cache, &staging_status_cache)
1020 .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
1021
1022 let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
1024 let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1025 symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
1026 E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
1027 })?;
1028
1029 let staging_archive_path = tar_dir.join(format!(
1031 "{}{}.{}",
1032 staging_dir_prefix,
1033 snapshot_slot,
1034 archive_format.extension(),
1035 ));
1036
1037 {
1038 let mut archive_file = fs::File::create(&staging_archive_path)
1039 .map_err(|err| E::CreateArchiveFile(err, staging_archive_path.clone()))?;
1040
1041 let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
1042 let mut archive = tar::Builder::new(encoder);
1043 archive
1046 .append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
1047 .map_err(E::ArchiveVersionFile)?;
1048 archive
1049 .append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
1050 .map_err(E::ArchiveSnapshotsDir)?;
1051
1052 for storage in snapshot_storages {
1053 let path_in_archive = Path::new(ACCOUNTS_DIR)
1054 .join(AccountsFile::file_name(storage.slot(), storage.id()));
1055 match storage.accounts.internals_for_archive() {
1056 InternalsForArchive::Mmap(data) => {
1057 let mut header = tar::Header::new_gnu();
1058 header.set_path(path_in_archive).map_err(|err| {
1059 E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1060 })?;
1061 header.set_size(storage.capacity());
1062 header.set_cksum();
1063 archive.append(&header, data)
1064 }
1065 InternalsForArchive::FileIo(path) => {
1066 archive.append_path_with_name(path, path_in_archive)
1067 }
1068 }
1069 .map_err(|err| E::ArchiveAccountStorageFile(err, storage.path().to_path_buf()))?;
1070 }
1071
1072 archive.into_inner().map_err(E::FinishArchive)?;
1073 Ok(())
1074 };
1075
1076 match archive_format {
1077 ArchiveFormat::TarBzip2 => {
1078 let mut encoder =
1079 bzip2::write::BzEncoder::new(archive_file, bzip2::Compression::best());
1080 do_archive_files(&mut encoder)?;
1081 encoder.finish().map_err(E::FinishEncoder)?;
1082 }
1083 ArchiveFormat::TarGzip => {
1084 let mut encoder =
1085 flate2::write::GzEncoder::new(archive_file, flate2::Compression::default());
1086 do_archive_files(&mut encoder)?;
1087 encoder.finish().map_err(E::FinishEncoder)?;
1088 }
1089 ArchiveFormat::TarZstd { config } => {
1090 let mut encoder =
1091 zstd::stream::Encoder::new(archive_file, config.compression_level)
1092 .map_err(E::CreateEncoder)?;
1093 do_archive_files(&mut encoder)?;
1094 encoder.finish().map_err(E::FinishEncoder)?;
1095 }
1096 ArchiveFormat::TarLz4 => {
1097 let mut encoder = lz4::EncoderBuilder::new()
1098 .level(1)
1099 .build(archive_file)
1100 .map_err(E::CreateEncoder)?;
1101 do_archive_files(&mut encoder)?;
1102 let (_output, result) = encoder.finish();
1103 result.map_err(E::FinishEncoder)?;
1104 }
1105 ArchiveFormat::Tar => {
1106 do_archive_files(&mut archive_file)?;
1107 }
1108 };
1109 }
1110
1111 let metadata = fs::metadata(&staging_archive_path)
1113 .map_err(|err| E::QueryArchiveMetadata(err, staging_archive_path.clone()))?;
1114 let archive_path = archive_path.as_ref().to_path_buf();
1115 fs::rename(&staging_archive_path, &archive_path)
1116 .map_err(|err| E::MoveArchive(err, staging_archive_path, archive_path.clone()))?;
1117
1118 timer.stop();
1119 info!(
1120 "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
1121 archive_path.display(),
1122 snapshot_slot,
1123 timer.as_ms(),
1124 metadata.len()
1125 );
1126
1127 datapoint_info!(
1128 "archive-snapshot-package",
1129 ("slot", snapshot_slot, i64),
1130 ("archive_format", archive_format.to_string(), String),
1131 ("duration_ms", timer.as_ms(), i64),
1132 (
1133 if snapshot_kind.is_full_snapshot() {
1134 "full-snapshot-archive-size"
1135 } else {
1136 "incremental-snapshot-archive-size"
1137 },
1138 metadata.len(),
1139 i64
1140 ),
1141 );
1142 Ok(SnapshotArchiveInfo {
1143 path: archive_path,
1144 slot: snapshot_slot,
1145 hash: snapshot_hash,
1146 archive_format,
1147 })
1148}
1149
1150pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1152 let mut bank_snapshots = Vec::default();
1153 match fs::read_dir(&bank_snapshots_dir) {
1154 Err(err) => {
1155 info!(
1156 "Unable to read bank snapshots directory '{}': {err}",
1157 bank_snapshots_dir.as_ref().display(),
1158 );
1159 }
1160 Ok(paths) => paths
1161 .filter_map(|entry| {
1162 entry
1165 .ok()
1166 .filter(|entry| entry.path().is_dir())
1167 .and_then(|entry| {
1168 entry
1169 .path()
1170 .file_name()
1171 .and_then(|file_name| file_name.to_str())
1172 .and_then(|file_name| file_name.parse::<Slot>().ok())
1173 })
1174 })
1175 .for_each(
1176 |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
1177 Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
1178 Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
1181 },
1182 ),
1183 }
1184 bank_snapshots
1185}
1186
1187pub fn get_bank_snapshots_pre(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1191 let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1192 bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Pre);
1193 bank_snapshots
1194}
1195
1196pub fn get_bank_snapshots_post(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1200 let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1201 bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Post);
1202 bank_snapshots
1203}
1204
1205pub fn get_highest_bank_snapshot_pre(
1209 bank_snapshots_dir: impl AsRef<Path>,
1210) -> Option<BankSnapshotInfo> {
1211 do_get_highest_bank_snapshot(get_bank_snapshots_pre(bank_snapshots_dir))
1212}
1213
1214pub fn get_highest_bank_snapshot_post(
1218 bank_snapshots_dir: impl AsRef<Path>,
1219) -> Option<BankSnapshotInfo> {
1220 do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
1221}
1222
1223pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
1227 do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
1228}
1229
1230fn do_get_highest_bank_snapshot(
1231 mut bank_snapshots: Vec<BankSnapshotInfo>,
1232) -> Option<BankSnapshotInfo> {
1233 bank_snapshots.sort_unstable();
1234 bank_snapshots.into_iter().next_back()
1235}
1236
1237pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
1238where
1239 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1240{
1241 serialize_snapshot_data_file_capped::<F>(
1242 data_file_path,
1243 MAX_SNAPSHOT_DATA_FILE_SIZE,
1244 serializer,
1245 )
1246}
1247
1248pub fn deserialize_snapshot_data_file<T: Sized>(
1249 data_file_path: &Path,
1250 deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
1251) -> Result<T> {
1252 let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
1253 deserializer(streams.full_snapshot_stream)
1254 };
1255
1256 let wrapped_data_file_path = SnapshotRootPaths {
1257 full_snapshot_root_file_path: data_file_path.to_path_buf(),
1258 incremental_snapshot_root_file_path: None,
1259 };
1260
1261 deserialize_snapshot_data_files_capped(
1262 &wrapped_data_file_path,
1263 MAX_SNAPSHOT_DATA_FILE_SIZE,
1264 wrapped_deserializer,
1265 )
1266}
1267
1268pub fn deserialize_snapshot_data_files<T: Sized>(
1269 snapshot_root_paths: &SnapshotRootPaths,
1270 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1271) -> Result<T> {
1272 deserialize_snapshot_data_files_capped(
1273 snapshot_root_paths,
1274 MAX_SNAPSHOT_DATA_FILE_SIZE,
1275 deserializer,
1276 )
1277}
1278
1279fn serialize_snapshot_data_file_capped<F>(
1280 data_file_path: &Path,
1281 maximum_file_size: u64,
1282 serializer: F,
1283) -> Result<u64>
1284where
1285 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1286{
1287 let data_file = fs::File::create(data_file_path)?;
1288 let mut data_file_stream = BufWriter::new(data_file);
1289 serializer(&mut data_file_stream)?;
1290 data_file_stream.flush()?;
1291
1292 let consumed_size = data_file_stream.stream_position()?;
1293 if consumed_size > maximum_file_size {
1294 let error_message = format!(
1295 "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
1296 data_file_path.display(),
1297 );
1298 return Err(IoError::other(error_message).into());
1299 }
1300 Ok(consumed_size)
1301}
1302
1303fn deserialize_snapshot_data_files_capped<T: Sized>(
1304 snapshot_root_paths: &SnapshotRootPaths,
1305 maximum_file_size: u64,
1306 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1307) -> Result<T> {
1308 let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
1309 create_snapshot_data_file_stream(
1310 &snapshot_root_paths.full_snapshot_root_file_path,
1311 maximum_file_size,
1312 )?;
1313
1314 let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
1315 if let Some(ref incremental_snapshot_root_file_path) =
1316 snapshot_root_paths.incremental_snapshot_root_file_path
1317 {
1318 Some(create_snapshot_data_file_stream(
1319 incremental_snapshot_root_file_path,
1320 maximum_file_size,
1321 )?)
1322 } else {
1323 None
1324 }
1325 .unzip();
1326
1327 let mut snapshot_streams = SnapshotStreams {
1328 full_snapshot_stream: &mut full_snapshot_data_file_stream,
1329 incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
1330 };
1331 let ret = deserializer(&mut snapshot_streams)?;
1332
1333 check_deserialize_file_consumed(
1334 full_snapshot_file_size,
1335 &snapshot_root_paths.full_snapshot_root_file_path,
1336 &mut full_snapshot_data_file_stream,
1337 )?;
1338
1339 if let Some(ref incremental_snapshot_root_file_path) =
1340 snapshot_root_paths.incremental_snapshot_root_file_path
1341 {
1342 check_deserialize_file_consumed(
1343 incremental_snapshot_file_size.unwrap(),
1344 incremental_snapshot_root_file_path,
1345 incremental_snapshot_data_file_stream.as_mut().unwrap(),
1346 )?;
1347 }
1348
1349 Ok(ret)
1350}
1351
1352fn create_snapshot_data_file_stream(
1355 snapshot_root_file_path: impl AsRef<Path>,
1356 maximum_file_size: u64,
1357) -> Result<(u64, BufReader<std::fs::File>)> {
1358 let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
1359
1360 if snapshot_file_size > maximum_file_size {
1361 let error_message = format!(
1362 "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
1363 snapshot_root_file_path.as_ref().display(),
1364 snapshot_file_size,
1365 maximum_file_size,
1366 );
1367 return Err(IoError::other(error_message).into());
1368 }
1369
1370 let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
1371 let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
1372
1373 Ok((snapshot_file_size, snapshot_data_file_stream))
1374}
1375
1376fn check_deserialize_file_consumed(
1379 file_size: u64,
1380 file_path: impl AsRef<Path>,
1381 file_stream: &mut BufReader<std::fs::File>,
1382) -> Result<()> {
1383 let consumed_size = file_stream.stream_position()?;
1384
1385 if consumed_size != file_size {
1386 let error_message = format!(
1387 "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to deserialize",
1388 file_path.as_ref().display(),
1389 file_size,
1390 consumed_size,
1391 );
1392 return Err(IoError::other(error_message).into());
1393 }
1394
1395 Ok(())
1396}
1397
1398fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
1400 let run_path = appendvec_path.parent()?;
1401 let run_file_name = run_path.file_name()?;
1402 if run_file_name != ACCOUNTS_RUN_DIR {
1405 error!(
1406 "The account path {} does not have run/ as its immediate parent directory.",
1407 run_path.display()
1408 );
1409 return None;
1410 }
1411 let account_path = run_path.parent()?;
1412 Some(account_path.to_path_buf())
1413}
1414
1415fn get_snapshot_accounts_hardlink_dir(
1418 appendvec_path: &Path,
1419 bank_slot: Slot,
1420 account_paths: &mut HashSet<PathBuf>,
1421 hardlinks_dir: impl AsRef<Path>,
1422) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1423 let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1424 GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1425 })?;
1426
1427 let snapshot_hardlink_dir = account_path
1428 .join(ACCOUNTS_SNAPSHOT_DIR)
1429 .join(bank_slot.to_string());
1430
1431 if !account_paths.contains(&account_path) {
1434 let idx = account_paths.len();
1435 debug!(
1436 "for appendvec_path {}, create hard-link path {}",
1437 appendvec_path.display(),
1438 snapshot_hardlink_dir.display()
1439 );
1440 fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1441 GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1442 err,
1443 snapshot_hardlink_dir.clone(),
1444 )
1445 })?;
1446 let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1447 symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1448 GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1449 source: err,
1450 original: snapshot_hardlink_dir.clone(),
1451 link: symlink_path,
1452 }
1453 })?;
1454 account_paths.insert(account_path);
1455 };
1456
1457 Ok(snapshot_hardlink_dir)
1458}
1459
1460pub fn hard_link_storages_to_snapshot(
1465 bank_snapshot_dir: impl AsRef<Path>,
1466 bank_slot: Slot,
1467 snapshot_storages: &[Arc<AccountStorageEntry>],
1468) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1469 let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1470 fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1471 HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1472 err,
1473 accounts_hardlinks_dir.clone(),
1474 )
1475 })?;
1476
1477 let mut account_paths: HashSet<PathBuf> = HashSet::new();
1478 for storage in snapshot_storages {
1479 let storage_path = storage.accounts.path();
1480 let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1481 storage_path,
1482 bank_slot,
1483 &mut account_paths,
1484 &accounts_hardlinks_dir,
1485 )?;
1486 let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
1489 let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1490 fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
1491 HardLinkStoragesToSnapshotError::HardLinkStorage(
1492 err,
1493 storage_path.to_path_buf(),
1494 hard_link_path,
1495 )
1496 })?;
1497 }
1498 Ok(())
1499}
1500
1501pub(crate) fn get_storages_to_serialize(
1504 snapshot_storages: &[Arc<AccountStorageEntry>],
1505) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1506 snapshot_storages
1507 .iter()
1508 .map(|storage| vec![Arc::clone(storage)])
1509 .collect::<Vec<_>>()
1510}
1511
1512const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
1514
1515pub fn verify_and_unarchive_snapshots(
1517 bank_snapshots_dir: impl AsRef<Path>,
1518 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1519 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1520 account_paths: &[PathBuf],
1521 storage_access: StorageAccess,
1522) -> Result<(
1523 UnarchivedSnapshot,
1524 Option<UnarchivedSnapshot>,
1525 AtomicAccountsFileId,
1526)> {
1527 check_are_snapshots_compatible(
1528 full_snapshot_archive_info,
1529 incremental_snapshot_archive_info,
1530 )?;
1531
1532 let parallel_divisions = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT);
1533
1534 let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
1535 let unarchived_full_snapshot = unarchive_snapshot(
1536 &bank_snapshots_dir,
1537 TMP_SNAPSHOT_ARCHIVE_PREFIX,
1538 full_snapshot_archive_info.path(),
1539 "snapshot untar",
1540 account_paths,
1541 full_snapshot_archive_info.archive_format(),
1542 parallel_divisions,
1543 next_append_vec_id.clone(),
1544 storage_access,
1545 )?;
1546
1547 let unarchived_incremental_snapshot =
1548 if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1549 let unarchived_incremental_snapshot = unarchive_snapshot(
1550 &bank_snapshots_dir,
1551 TMP_SNAPSHOT_ARCHIVE_PREFIX,
1552 incremental_snapshot_archive_info.path(),
1553 "incremental snapshot untar",
1554 account_paths,
1555 incremental_snapshot_archive_info.archive_format(),
1556 parallel_divisions,
1557 next_append_vec_id.clone(),
1558 storage_access,
1559 )?;
1560 Some(unarchived_incremental_snapshot)
1561 } else {
1562 None
1563 };
1564
1565 Ok((
1566 unarchived_full_snapshot,
1567 unarchived_incremental_snapshot,
1568 Arc::try_unwrap(next_append_vec_id).unwrap(),
1569 ))
1570}
1571
1572fn spawn_unpack_snapshot_thread(
1574 file_sender: Sender<PathBuf>,
1575 account_paths: Arc<Vec<PathBuf>>,
1576 ledger_dir: Arc<PathBuf>,
1577 mut archive: Archive<SharedBufferReader>,
1578 parallel_selector: Option<ParallelSelector>,
1579 thread_index: usize,
1580) -> JoinHandle<()> {
1581 Builder::new()
1582 .name(format!("solUnpkSnpsht{thread_index:02}"))
1583 .spawn(move || {
1584 hardened_unpack::streaming_unpack_snapshot(
1585 &mut archive,
1586 ledger_dir.as_path(),
1587 &account_paths,
1588 parallel_selector,
1589 &file_sender,
1590 )
1591 .unwrap();
1592 })
1593 .unwrap()
1594}
1595
1596fn streaming_unarchive_snapshot(
1598 file_sender: Sender<PathBuf>,
1599 account_paths: Vec<PathBuf>,
1600 ledger_dir: PathBuf,
1601 snapshot_archive_path: PathBuf,
1602 archive_format: ArchiveFormat,
1603 num_threads: usize,
1604) -> Vec<JoinHandle<()>> {
1605 let account_paths = Arc::new(account_paths);
1606 let ledger_dir = Arc::new(ledger_dir);
1607 let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format);
1608
1609 let archives: Vec<_> = (0..num_threads)
1611 .map(|_| {
1612 let reader = SharedBufferReader::new(&shared_buffer);
1613 Archive::new(reader)
1614 })
1615 .collect();
1616
1617 archives
1618 .into_iter()
1619 .enumerate()
1620 .map(|(thread_index, archive)| {
1621 let parallel_selector = Some(ParallelSelector {
1622 index: thread_index,
1623 divisions: num_threads,
1624 });
1625
1626 spawn_unpack_snapshot_thread(
1627 file_sender.clone(),
1628 account_paths.clone(),
1629 ledger_dir.clone(),
1630 archive,
1631 parallel_selector,
1632 thread_index,
1633 )
1634 })
1635 .collect()
1636}
1637
1638fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1643 let snapshots_dir = unpack_dir.as_ref().join("snapshots");
1644 if !snapshots_dir.is_dir() {
1645 return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1646 }
1647
1648 let slot_dir = std::fs::read_dir(&snapshots_dir)
1650 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1651 .find(|entry| entry.as_ref().unwrap().path().is_dir())
1652 .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1653 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1654 .path();
1655
1656 let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
1657 fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
1658
1659 let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1660 fs::hard_link(
1661 status_cache_file,
1662 slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
1663 )?;
1664
1665 let state_complete_file = slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
1666 fs::File::create(state_complete_file)?;
1667
1668 Ok(())
1669}
1670
1671fn unarchive_snapshot(
1675 bank_snapshots_dir: impl AsRef<Path>,
1676 unpacked_snapshots_dir_prefix: &'static str,
1677 snapshot_archive_path: impl AsRef<Path>,
1678 measure_name: &'static str,
1679 account_paths: &[PathBuf],
1680 archive_format: ArchiveFormat,
1681 parallel_divisions: usize,
1682 next_append_vec_id: Arc<AtomicAccountsFileId>,
1683 storage_access: StorageAccess,
1684) -> Result<UnarchivedSnapshot> {
1685 let unpack_dir = tempfile::Builder::new()
1686 .prefix(unpacked_snapshots_dir_prefix)
1687 .tempdir_in(bank_snapshots_dir)?;
1688 let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
1689
1690 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1691 streaming_unarchive_snapshot(
1692 file_sender,
1693 account_paths.to_vec(),
1694 unpack_dir.path().to_path_buf(),
1695 snapshot_archive_path.as_ref().to_path_buf(),
1696 archive_format,
1697 parallel_divisions,
1698 );
1699
1700 let num_rebuilder_threads = num_cpus::get_physical()
1701 .saturating_sub(parallel_divisions)
1702 .max(1);
1703 let (version_and_storages, measure_untar) = measure_time!(
1704 SnapshotStorageRebuilder::rebuild_storage(
1705 file_receiver,
1706 num_rebuilder_threads,
1707 next_append_vec_id,
1708 SnapshotFrom::Archive,
1709 storage_access,
1710 )?,
1711 measure_name
1712 );
1713 info!("{}", measure_untar);
1714
1715 create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1716
1717 let RebuiltSnapshotStorage {
1718 snapshot_version,
1719 storage,
1720 } = version_and_storages;
1721 Ok(UnarchivedSnapshot {
1722 unpack_dir,
1723 storage,
1724 unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1725 unpacked_snapshots_dir,
1726 snapshot_version,
1727 },
1728 measure_untar,
1729 })
1730}
1731
1732fn streaming_snapshot_dir_files(
1735 file_sender: Sender<PathBuf>,
1736 snapshot_file_path: impl Into<PathBuf>,
1737 snapshot_version_path: impl Into<PathBuf>,
1738 account_paths: &[PathBuf],
1739) -> Result<()> {
1740 file_sender.send(snapshot_file_path.into())?;
1741 file_sender.send(snapshot_version_path.into())?;
1742
1743 for account_path in account_paths {
1744 for file in fs::read_dir(account_path)? {
1745 file_sender.send(file?.path())?;
1746 }
1747 }
1748
1749 Ok(())
1750}
1751
1752pub fn rebuild_storages_from_snapshot_dir(
1757 snapshot_info: &BankSnapshotInfo,
1758 account_paths: &[PathBuf],
1759 next_append_vec_id: Arc<AtomicAccountsFileId>,
1760 storage_access: StorageAccess,
1761) -> Result<AccountStorageMap> {
1762 let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1763 let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1764 let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1765
1766 let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1767 IoError::other(format!(
1768 "failed to read accounts hardlinks dir '{}': {err}",
1769 accounts_hardlinks.display(),
1770 ))
1771 })?;
1772 for dir_entry in read_dir {
1773 let symlink_path = dir_entry?.path();
1774 let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
1777 IoError::other(format!(
1778 "failed to read symlink '{}': {err}",
1779 symlink_path.display(),
1780 ))
1781 })?;
1782 let account_run_path = account_snapshot_path
1783 .parent()
1784 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1785 .parent()
1786 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1787 .join(ACCOUNTS_RUN_DIR);
1788 if !account_run_paths.contains(&account_run_path) {
1789 return Err(SnapshotError::AccountPathsMismatch);
1792 }
1793 let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
1796 IoError::other(format!(
1797 "failed to read account snapshot dir '{}': {err}",
1798 account_snapshot_path.display(),
1799 ))
1800 })?;
1801 for file in read_dir {
1802 let file_path = file?.path();
1803 let file_name = file_path
1804 .file_name()
1805 .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
1806 let dest_path = account_run_path.join(file_name);
1807 fs::hard_link(&file_path, &dest_path).map_err(|err| {
1808 IoError::other(format!(
1809 "failed to hard link from '{}' to '{}': {err}",
1810 file_path.display(),
1811 dest_path.display(),
1812 ))
1813 })?;
1814 }
1815 }
1816
1817 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1818 let snapshot_file_path = &snapshot_info.snapshot_path();
1819 let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1820 streaming_snapshot_dir_files(
1821 file_sender,
1822 snapshot_file_path,
1823 snapshot_version_path,
1824 account_paths,
1825 )?;
1826
1827 let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1828 let version_and_storages = SnapshotStorageRebuilder::rebuild_storage(
1829 file_receiver,
1830 num_rebuilder_threads,
1831 next_append_vec_id,
1832 SnapshotFrom::Dir,
1833 storage_access,
1834 )?;
1835
1836 let RebuiltSnapshotStorage {
1837 snapshot_version: _,
1838 storage,
1839 } = version_and_storages;
1840 Ok(storage)
1841}
1842
1843fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
1847 let file_metadata = fs::metadata(&path).map_err(|err| {
1849 IoError::other(format!(
1850 "failed to query snapshot version file metadata '{}': {err}",
1851 path.as_ref().display(),
1852 ))
1853 })?;
1854 let file_size = file_metadata.len();
1855 if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
1856 let error_message = format!(
1857 "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
1858 path.as_ref().display(),
1859 file_size,
1860 MAX_SNAPSHOT_VERSION_FILE_SIZE,
1861 );
1862 return Err(IoError::other(error_message).into());
1863 }
1864
1865 let mut snapshot_version = String::new();
1867 let mut file = fs::File::open(&path).map_err(|err| {
1868 IoError::other(format!(
1869 "failed to open snapshot version file '{}': {err}",
1870 path.as_ref().display()
1871 ))
1872 })?;
1873 file.read_to_string(&mut snapshot_version).map_err(|err| {
1874 IoError::other(format!(
1875 "failed to read snapshot version from file '{}': {err}",
1876 path.as_ref().display()
1877 ))
1878 })?;
1879
1880 Ok(snapshot_version.trim().to_string())
1881}
1882
1883fn check_are_snapshots_compatible(
1886 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1887 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1888) -> Result<()> {
1889 if incremental_snapshot_archive_info.is_none() {
1890 return Ok(());
1891 }
1892
1893 let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
1894
1895 (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
1896 .then_some(())
1897 .ok_or_else(|| {
1898 SnapshotError::MismatchedBaseSlot(
1899 full_snapshot_archive_info.slot(),
1900 incremental_snapshot_archive_info.base_slot(),
1901 )
1902 })
1903}
1904
1905pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
1907 path.file_name()
1908 .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))?
1909 .to_str()
1910 .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
1911}
1912
1913pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
1914 snapshot_archives_dir
1915 .as_ref()
1916 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
1917}
1918
1919pub fn build_full_snapshot_archive_path(
1922 full_snapshot_archives_dir: impl AsRef<Path>,
1923 slot: Slot,
1924 hash: &SnapshotHash,
1925 archive_format: ArchiveFormat,
1926) -> PathBuf {
1927 full_snapshot_archives_dir.as_ref().join(format!(
1928 "snapshot-{}-{}.{}",
1929 slot,
1930 hash.0,
1931 archive_format.extension(),
1932 ))
1933}
1934
1935pub fn build_incremental_snapshot_archive_path(
1939 incremental_snapshot_archives_dir: impl AsRef<Path>,
1940 base_slot: Slot,
1941 slot: Slot,
1942 hash: &SnapshotHash,
1943 archive_format: ArchiveFormat,
1944) -> PathBuf {
1945 incremental_snapshot_archives_dir.as_ref().join(format!(
1946 "incremental-snapshot-{}-{}-{}.{}",
1947 base_slot,
1948 slot,
1949 hash.0,
1950 archive_format.extension(),
1951 ))
1952}
1953
1954pub(crate) fn parse_full_snapshot_archive_filename(
1956 archive_filename: &str,
1957) -> Result<(Slot, SnapshotHash, ArchiveFormat)> {
1958 lazy_static! {
1959 static ref RE: Regex = Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1960 }
1961
1962 let do_parse = || {
1963 RE.captures(archive_filename).and_then(|captures| {
1964 let slot = captures
1965 .name("slot")
1966 .map(|x| x.as_str().parse::<Slot>())?
1967 .ok()?;
1968 let hash = captures
1969 .name("hash")
1970 .map(|x| x.as_str().parse::<Hash>())?
1971 .ok()?;
1972 let archive_format = captures
1973 .name("ext")
1974 .map(|x| x.as_str().parse::<ArchiveFormat>())?
1975 .ok()?;
1976
1977 Some((slot, SnapshotHash(hash), archive_format))
1978 })
1979 };
1980
1981 do_parse().ok_or_else(|| {
1982 SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
1983 })
1984}
1985
1986pub(crate) fn parse_incremental_snapshot_archive_filename(
1988 archive_filename: &str,
1989) -> Result<(Slot, Slot, SnapshotHash, ArchiveFormat)> {
1990 lazy_static! {
1991 static ref RE: Regex = Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1992 }
1993
1994 let do_parse = || {
1995 RE.captures(archive_filename).and_then(|captures| {
1996 let base_slot = captures
1997 .name("base")
1998 .map(|x| x.as_str().parse::<Slot>())?
1999 .ok()?;
2000 let slot = captures
2001 .name("slot")
2002 .map(|x| x.as_str().parse::<Slot>())?
2003 .ok()?;
2004 let hash = captures
2005 .name("hash")
2006 .map(|x| x.as_str().parse::<Hash>())?
2007 .ok()?;
2008 let archive_format = captures
2009 .name("ext")
2010 .map(|x| x.as_str().parse::<ArchiveFormat>())?
2011 .ok()?;
2012
2013 Some((base_slot, slot, SnapshotHash(hash), archive_format))
2014 })
2015 };
2016
2017 do_parse().ok_or_else(|| {
2018 SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2019 })
2020}
2021
2022fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
2024where
2025 F: Fn(PathBuf) -> Result<T>,
2026{
2027 let walk_dir = |dir: &Path| -> Vec<T> {
2028 let entry_iter = fs::read_dir(dir);
2029 match entry_iter {
2030 Err(err) => {
2031 info!(
2032 "Unable to read snapshot archives directory '{}': {err}",
2033 dir.display(),
2034 );
2035 vec![]
2036 }
2037 Ok(entries) => entries
2038 .filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
2039 .collect(),
2040 }
2041 };
2042
2043 let mut ret = walk_dir(snapshot_archives_dir);
2044 let remote_dir = build_snapshot_archives_remote_dir(snapshot_archives_dir);
2045 if remote_dir.exists() {
2046 ret.append(&mut walk_dir(remote_dir.as_ref()));
2047 }
2048 ret
2049}
2050
2051pub fn get_full_snapshot_archives(
2053 full_snapshot_archives_dir: impl AsRef<Path>,
2054) -> Vec<FullSnapshotArchiveInfo> {
2055 get_snapshot_archives(
2056 full_snapshot_archives_dir.as_ref(),
2057 FullSnapshotArchiveInfo::new_from_path,
2058 )
2059}
2060
2061pub fn get_incremental_snapshot_archives(
2063 incremental_snapshot_archives_dir: impl AsRef<Path>,
2064) -> Vec<IncrementalSnapshotArchiveInfo> {
2065 get_snapshot_archives(
2066 incremental_snapshot_archives_dir.as_ref(),
2067 IncrementalSnapshotArchiveInfo::new_from_path,
2068 )
2069}
2070
2071pub fn get_highest_full_snapshot_archive_slot(
2073 full_snapshot_archives_dir: impl AsRef<Path>,
2074) -> Option<Slot> {
2075 get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
2076 .map(|full_snapshot_archive_info| full_snapshot_archive_info.slot())
2077}
2078
2079pub fn get_highest_incremental_snapshot_archive_slot(
2082 incremental_snapshot_archives_dir: impl AsRef<Path>,
2083 full_snapshot_slot: Slot,
2084) -> Option<Slot> {
2085 get_highest_incremental_snapshot_archive_info(
2086 incremental_snapshot_archives_dir,
2087 full_snapshot_slot,
2088 )
2089 .map(|incremental_snapshot_archive_info| incremental_snapshot_archive_info.slot())
2090}
2091
2092pub fn get_highest_full_snapshot_archive_info(
2094 full_snapshot_archives_dir: impl AsRef<Path>,
2095) -> Option<FullSnapshotArchiveInfo> {
2096 let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2097 full_snapshot_archives.sort_unstable();
2098 full_snapshot_archives.into_iter().next_back()
2099}
2100
2101pub fn get_highest_incremental_snapshot_archive_info(
2104 incremental_snapshot_archives_dir: impl AsRef<Path>,
2105 full_snapshot_slot: Slot,
2106) -> Option<IncrementalSnapshotArchiveInfo> {
2107 let mut incremental_snapshot_archives =
2111 get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
2112 .into_iter()
2113 .filter(|incremental_snapshot_archive_info| {
2114 incremental_snapshot_archive_info.base_slot() == full_snapshot_slot
2115 })
2116 .collect::<Vec<_>>();
2117 incremental_snapshot_archives.sort_unstable();
2118 incremental_snapshot_archives.into_iter().next_back()
2119}
2120
2121pub fn purge_old_snapshot_archives(
2122 full_snapshot_archives_dir: impl AsRef<Path>,
2123 incremental_snapshot_archives_dir: impl AsRef<Path>,
2124 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2125 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2126) {
2127 info!(
2128 "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
2129 full_snapshot_archives_dir.as_ref().display(),
2130 maximum_full_snapshot_archives_to_retain
2131 );
2132
2133 let mut full_snapshot_archives = get_full_snapshot_archives(&full_snapshot_archives_dir);
2134 full_snapshot_archives.sort_unstable();
2135 full_snapshot_archives.reverse();
2136
2137 let num_to_retain = full_snapshot_archives
2138 .len()
2139 .min(maximum_full_snapshot_archives_to_retain.get());
2140 trace!(
2141 "There are {} full snapshot archives, retaining {}",
2142 full_snapshot_archives.len(),
2143 num_to_retain,
2144 );
2145
2146 let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
2147 if full_snapshot_archives.is_empty() {
2148 None
2149 } else {
2150 Some(full_snapshot_archives.split_at(num_to_retain))
2151 }
2152 .unwrap_or_default();
2153
2154 let retained_full_snapshot_slots = full_snapshot_archives_to_retain
2155 .iter()
2156 .map(|ai| ai.slot())
2157 .collect::<HashSet<_>>();
2158
2159 fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
2160 for path in archives.iter().map(|a| a.path()) {
2161 trace!("Removing snapshot archive: {}", path.display());
2162 let result = fs::remove_file(path);
2163 if let Err(err) = result {
2164 info!(
2165 "Failed to remove snapshot archive '{}': {err}",
2166 path.display()
2167 );
2168 }
2169 }
2170 }
2171 remove_archives(full_snapshot_archives_to_remove);
2172
2173 info!(
2174 "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
2175 incremental_snapshot_archives_dir.as_ref().display(),
2176 maximum_incremental_snapshot_archives_to_retain
2177 );
2178 let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
2179 for incremental_snapshot_archive in
2180 get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
2181 {
2182 incremental_snapshot_archives_by_base_slot
2183 .entry(incremental_snapshot_archive.base_slot())
2184 .or_default()
2185 .push(incremental_snapshot_archive)
2186 }
2187
2188 let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
2189 for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
2190 {
2191 incremental_snapshot_archives.sort_unstable();
2192 let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
2193 maximum_incremental_snapshot_archives_to_retain.get()
2194 } else {
2195 usize::from(retained_full_snapshot_slots.contains(&base_slot))
2196 };
2197 trace!(
2198 "There are {} incremental snapshot archives for base slot {}, removing {} of them",
2199 incremental_snapshot_archives.len(),
2200 base_slot,
2201 incremental_snapshot_archives
2202 .len()
2203 .saturating_sub(num_to_retain),
2204 );
2205
2206 incremental_snapshot_archives.truncate(
2207 incremental_snapshot_archives
2208 .len()
2209 .saturating_sub(num_to_retain),
2210 );
2211 remove_archives(&incremental_snapshot_archives);
2212 }
2213}
2214
2215#[cfg(feature = "dev-context-only-utils")]
2216fn unpack_snapshot_local(
2217 shared_buffer: SharedBuffer,
2218 ledger_dir: &Path,
2219 account_paths: &[PathBuf],
2220 parallel_divisions: usize,
2221) -> Result<UnpackedAppendVecMap> {
2222 assert!(parallel_divisions > 0);
2223
2224 let readers = (0..parallel_divisions)
2226 .map(|_| SharedBufferReader::new(&shared_buffer))
2227 .collect::<Vec<_>>();
2228
2229 let all_unpacked_append_vec_map = readers
2231 .into_par_iter()
2232 .enumerate()
2233 .map(|(index, reader)| {
2234 let parallel_selector = Some(ParallelSelector {
2235 index,
2236 divisions: parallel_divisions,
2237 });
2238 let mut archive = Archive::new(reader);
2239 hardened_unpack::unpack_snapshot(
2240 &mut archive,
2241 ledger_dir,
2242 account_paths,
2243 parallel_selector,
2244 )
2245 })
2246 .collect::<Vec<_>>();
2247
2248 let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
2249 for h in all_unpacked_append_vec_map {
2250 unpacked_append_vec_map.extend(h?);
2251 }
2252
2253 Ok(unpacked_append_vec_map)
2254}
2255
2256fn untar_snapshot_create_shared_buffer(
2257 snapshot_tar: &Path,
2258 archive_format: ArchiveFormat,
2259) -> SharedBuffer {
2260 let open_file = || {
2261 fs::File::open(snapshot_tar)
2262 .map_err(|err| {
2263 IoError::other(format!(
2264 "failed to open snapshot archive '{}': {err}",
2265 snapshot_tar.display(),
2266 ))
2267 })
2268 .unwrap()
2269 };
2270 match archive_format {
2271 ArchiveFormat::TarBzip2 => SharedBuffer::new(BzDecoder::new(BufReader::new(open_file()))),
2272 ArchiveFormat::TarGzip => SharedBuffer::new(GzDecoder::new(BufReader::new(open_file()))),
2273 ArchiveFormat::TarZstd { .. } => SharedBuffer::new(
2274 zstd::stream::read::Decoder::new(BufReader::new(open_file())).unwrap(),
2275 ),
2276 ArchiveFormat::TarLz4 => {
2277 SharedBuffer::new(lz4::Decoder::new(BufReader::new(open_file())).unwrap())
2278 }
2279 ArchiveFormat::Tar => SharedBuffer::new(BufReader::new(open_file())),
2280 }
2281}
2282
2283#[cfg(feature = "dev-context-only-utils")]
2284fn untar_snapshot_in(
2285 snapshot_tar: impl AsRef<Path>,
2286 unpack_dir: &Path,
2287 account_paths: &[PathBuf],
2288 archive_format: ArchiveFormat,
2289 parallel_divisions: usize,
2290) -> Result<UnpackedAppendVecMap> {
2291 let shared_buffer = untar_snapshot_create_shared_buffer(snapshot_tar.as_ref(), archive_format);
2292 unpack_snapshot_local(shared_buffer, unpack_dir, account_paths, parallel_divisions)
2293}
2294
2295pub fn verify_unpacked_snapshots_dir_and_version(
2296 unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
2297) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
2298 info!(
2299 "snapshot version: {}",
2300 &unpacked_snapshots_dir_and_version.snapshot_version
2301 );
2302
2303 let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
2304 let mut bank_snapshots =
2305 get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
2306 if bank_snapshots.len() > 1 {
2307 return Err(IoError::other(format!(
2308 "invalid snapshot format: only one snapshot allowed, but found {}",
2309 bank_snapshots.len(),
2310 ))
2311 .into());
2312 }
2313 let root_paths = bank_snapshots.pop().ok_or_else(|| {
2314 IoError::other(format!(
2315 "no snapshots found in snapshots directory '{}'",
2316 unpacked_snapshots_dir_and_version
2317 .unpacked_snapshots_dir
2318 .display(),
2319 ))
2320 })?;
2321 Ok((snapshot_version, root_paths))
2322}
2323
2324pub fn get_snapshot_file_name(slot: Slot) -> String {
2326 slot.to_string()
2327}
2328
2329pub fn get_bank_snapshot_dir(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) -> PathBuf {
2331 bank_snapshots_dir
2332 .as_ref()
2333 .join(get_snapshot_file_name(slot))
2334}
2335
2336#[derive(Debug, Copy, Clone)]
2337pub enum VerifyBank {
2339 Deterministic,
2341 NonDeterministic,
2344}
2345
2346#[cfg(feature = "dev-context-only-utils")]
2347pub fn verify_snapshot_archive(
2348 snapshot_archive: impl AsRef<Path>,
2349 snapshots_to_verify: impl AsRef<Path>,
2350 archive_format: ArchiveFormat,
2351 verify_bank: VerifyBank,
2352 slot: Slot,
2353) {
2354 let temp_dir = tempfile::TempDir::new().unwrap();
2355 let unpack_dir = temp_dir.path();
2356 let unpack_account_dir = create_accounts_run_and_snapshot_dirs(unpack_dir).unwrap().0;
2357 untar_snapshot_in(
2358 snapshot_archive,
2359 unpack_dir,
2360 &[unpack_account_dir.clone()],
2361 archive_format,
2362 1,
2363 )
2364 .unwrap();
2365
2366 let unpacked_snapshots = unpack_dir.join("snapshots");
2368
2369 let storages_to_verify = unpack_dir.join("storages_to_verify");
2372 fs::create_dir_all(&storages_to_verify).unwrap();
2374
2375 let slot = slot.to_string();
2376 let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
2377
2378 if let VerifyBank::NonDeterministic = verify_bank {
2379 let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
2381 let p2 = unpacked_snapshots.join(&slot).join(&slot);
2382 assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
2383 fs::remove_file(p1).unwrap();
2384 fs::remove_file(p2).unwrap();
2385 }
2386
2387 let existing_unpacked_status_cache_file =
2393 unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2394 let new_unpacked_status_cache_file = unpacked_snapshots
2395 .join(&slot)
2396 .join(SNAPSHOT_STATUS_CACHE_FILENAME);
2397 fs::rename(
2398 existing_unpacked_status_cache_file,
2399 new_unpacked_status_cache_file,
2400 )
2401 .unwrap();
2402
2403 let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2404 if accounts_hardlinks_dir.is_dir() {
2405 for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
2407 let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
2408 for entry in fs::read_dir(&link_dst_path).unwrap() {
2410 let src_path = entry.unwrap().path();
2411 let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
2412 fs::copy(src_path, dst_path).unwrap();
2413 }
2414 }
2415 fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
2416 }
2417
2418 let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
2419 if version_path.is_file() {
2420 fs::remove_file(version_path).unwrap();
2421 }
2422
2423 let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2424 if state_complete_path.is_file() {
2425 fs::remove_file(state_complete_path).unwrap();
2426 }
2427
2428 assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
2429
2430 _ = fs::remove_dir(unpack_account_dir.join("accounts"));
2436 assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
2438}
2439
2440pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
2442 let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2443 purge_bank_snapshots(&bank_snapshots);
2444}
2445
2446pub fn purge_old_bank_snapshots(
2448 bank_snapshots_dir: impl AsRef<Path>,
2449 num_bank_snapshots_to_retain: usize,
2450 filter_by_kind: Option<BankSnapshotKind>,
2451) {
2452 let mut bank_snapshots = match filter_by_kind {
2453 Some(BankSnapshotKind::Pre) => get_bank_snapshots_pre(&bank_snapshots_dir),
2454 Some(BankSnapshotKind::Post) => get_bank_snapshots_post(&bank_snapshots_dir),
2455 None => get_bank_snapshots(&bank_snapshots_dir),
2456 };
2457
2458 bank_snapshots.sort_unstable();
2459 purge_bank_snapshots(
2460 bank_snapshots
2461 .iter()
2462 .rev()
2463 .skip(num_bank_snapshots_to_retain),
2464 );
2465}
2466
2467pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
2472 purge_old_bank_snapshots(&bank_snapshots_dir, 0, Some(BankSnapshotKind::Pre));
2473 purge_old_bank_snapshots(&bank_snapshots_dir, 1, Some(BankSnapshotKind::Post));
2474
2475 let highest_bank_snapshot_post = get_highest_bank_snapshot_post(&bank_snapshots_dir);
2476 if let Some(highest_bank_snapshot_post) = highest_bank_snapshot_post {
2477 debug!(
2478 "Retained bank snapshot for slot {}, and purged the rest.",
2479 highest_bank_snapshot_post.slot
2480 );
2481 }
2482}
2483
2484pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
2486 let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2487 bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
2488 purge_bank_snapshots(&bank_snapshots);
2489}
2490
2491fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
2495 for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
2496 if purge_bank_snapshot(snapshot_dir).is_err() {
2497 warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
2498 }
2499 }
2500}
2501
2502pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
2504 const FN_ERR: &str = "failed to purge bank snapshot";
2505 let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2506 if accounts_hardlinks_dir.is_dir() {
2507 let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
2510 IoError::other(format!(
2511 "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
2512 accounts_hardlinks_dir.display(),
2513 ))
2514 })?;
2515 for entry in read_dir {
2516 let accounts_hardlink_dir = entry?.path();
2517 let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
2518 IoError::other(format!(
2519 "{FN_ERR}: failed to read symlink '{}': {err}",
2520 accounts_hardlink_dir.display(),
2521 ))
2522 })?;
2523 move_and_async_delete_path(&accounts_hardlink_dir);
2524 }
2525 }
2526 fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
2527 IoError::other(format!(
2528 "{FN_ERR}: failed to remove dir '{}': {err}",
2529 bank_snapshot_dir.as_ref().display(),
2530 ))
2531 })?;
2532 Ok(())
2533}
2534
2535pub fn should_take_full_snapshot(
2536 block_height: Slot,
2537 full_snapshot_archive_interval_slots: Slot,
2538) -> bool {
2539 block_height % full_snapshot_archive_interval_slots == 0
2540}
2541
2542pub fn should_take_incremental_snapshot(
2543 block_height: Slot,
2544 incremental_snapshot_archive_interval_slots: Slot,
2545 latest_full_snapshot_slot: Option<Slot>,
2546) -> bool {
2547 block_height % incremental_snapshot_archive_interval_slots == 0
2548 && latest_full_snapshot_slot.is_some()
2549}
2550
2551#[cfg(feature = "dev-context-only-utils")]
2556pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
2557 let tmp_dir = tempfile::TempDir::new().unwrap();
2558 let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
2559 (tmp_dir, account_dir)
2560}
2561
2562#[cfg(test)]
2563mod tests {
2564 use {
2565 super::*,
2566 assert_matches::assert_matches,
2567 bincode::{deserialize_from, serialize_into},
2568 std::{convert::TryFrom, mem::size_of},
2569 tempfile::NamedTempFile,
2570 };
2571
2572 #[test]
2573 fn test_serialize_snapshot_data_file_under_limit() {
2574 let temp_dir = tempfile::TempDir::new().unwrap();
2575 let expected_consumed_size = size_of::<u32>() as u64;
2576 let consumed_size = serialize_snapshot_data_file_capped(
2577 &temp_dir.path().join("data-file"),
2578 expected_consumed_size,
2579 |stream| {
2580 serialize_into(stream, &2323_u32)?;
2581 Ok(())
2582 },
2583 )
2584 .unwrap();
2585 assert_eq!(consumed_size, expected_consumed_size);
2586 }
2587
2588 #[test]
2589 fn test_serialize_snapshot_data_file_over_limit() {
2590 let temp_dir = tempfile::TempDir::new().unwrap();
2591 let expected_consumed_size = size_of::<u32>() as u64;
2592 let result = serialize_snapshot_data_file_capped(
2593 &temp_dir.path().join("data-file"),
2594 expected_consumed_size - 1,
2595 |stream| {
2596 serialize_into(stream, &2323_u32)?;
2597 Ok(())
2598 },
2599 );
2600 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
2601 }
2602
2603 #[test]
2604 fn test_deserialize_snapshot_data_file_under_limit() {
2605 let expected_data = 2323_u32;
2606 let expected_consumed_size = size_of::<u32>() as u64;
2607
2608 let temp_dir = tempfile::TempDir::new().unwrap();
2609 serialize_snapshot_data_file_capped(
2610 &temp_dir.path().join("data-file"),
2611 expected_consumed_size,
2612 |stream| {
2613 serialize_into(stream, &expected_data)?;
2614 Ok(())
2615 },
2616 )
2617 .unwrap();
2618
2619 let snapshot_root_paths = SnapshotRootPaths {
2620 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2621 incremental_snapshot_root_file_path: None,
2622 };
2623
2624 let actual_data = deserialize_snapshot_data_files_capped(
2625 &snapshot_root_paths,
2626 expected_consumed_size,
2627 |stream| {
2628 Ok(deserialize_from::<_, u32>(
2629 &mut stream.full_snapshot_stream,
2630 )?)
2631 },
2632 )
2633 .unwrap();
2634 assert_eq!(actual_data, expected_data);
2635 }
2636
2637 #[test]
2638 fn test_deserialize_snapshot_data_file_over_limit() {
2639 let expected_data = 2323_u32;
2640 let expected_consumed_size = size_of::<u32>() as u64;
2641
2642 let temp_dir = tempfile::TempDir::new().unwrap();
2643 serialize_snapshot_data_file_capped(
2644 &temp_dir.path().join("data-file"),
2645 expected_consumed_size,
2646 |stream| {
2647 serialize_into(stream, &expected_data)?;
2648 Ok(())
2649 },
2650 )
2651 .unwrap();
2652
2653 let snapshot_root_paths = SnapshotRootPaths {
2654 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2655 incremental_snapshot_root_file_path: None,
2656 };
2657
2658 let result = deserialize_snapshot_data_files_capped(
2659 &snapshot_root_paths,
2660 expected_consumed_size - 1,
2661 |stream| {
2662 Ok(deserialize_from::<_, u32>(
2663 &mut stream.full_snapshot_stream,
2664 )?)
2665 },
2666 );
2667 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
2668 }
2669
2670 #[test]
2671 fn test_deserialize_snapshot_data_file_extra_data() {
2672 let expected_data = 2323_u32;
2673 let expected_consumed_size = size_of::<u32>() as u64;
2674
2675 let temp_dir = tempfile::TempDir::new().unwrap();
2676 serialize_snapshot_data_file_capped(
2677 &temp_dir.path().join("data-file"),
2678 expected_consumed_size * 2,
2679 |stream| {
2680 serialize_into(stream.by_ref(), &expected_data)?;
2681 serialize_into(stream.by_ref(), &expected_data)?;
2682 Ok(())
2683 },
2684 )
2685 .unwrap();
2686
2687 let snapshot_root_paths = SnapshotRootPaths {
2688 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2689 incremental_snapshot_root_file_path: None,
2690 };
2691
2692 let result = deserialize_snapshot_data_files_capped(
2693 &snapshot_root_paths,
2694 expected_consumed_size * 2,
2695 |stream| {
2696 Ok(deserialize_from::<_, u32>(
2697 &mut stream.full_snapshot_stream,
2698 )?)
2699 },
2700 );
2701 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2702 }
2703
2704 #[test]
2705 fn test_snapshot_version_from_file_under_limit() {
2706 let file_content = SnapshotVersion::default().as_str();
2707 let mut file = NamedTempFile::new().unwrap();
2708 file.write_all(file_content.as_bytes()).unwrap();
2709 let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2710 assert_eq!(version_from_file, file_content);
2711 }
2712
2713 #[test]
2714 fn test_snapshot_version_from_file_over_limit() {
2715 let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2716 let file_content = vec![7u8; over_limit_size];
2717 let mut file = NamedTempFile::new().unwrap();
2718 file.write_all(&file_content).unwrap();
2719 assert_matches!(
2720 snapshot_version_from_file(file.path()),
2721 Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("snapshot version file too large")
2722 );
2723 }
2724
2725 #[test]
2726 fn test_parse_full_snapshot_archive_filename() {
2727 assert_eq!(
2728 parse_full_snapshot_archive_filename(&format!(
2729 "snapshot-42-{}.tar.bz2",
2730 Hash::default()
2731 ))
2732 .unwrap(),
2733 (42, SnapshotHash(Hash::default()), ArchiveFormat::TarBzip2)
2734 );
2735 assert_eq!(
2736 parse_full_snapshot_archive_filename(&format!(
2737 "snapshot-43-{}.tar.zst",
2738 Hash::default()
2739 ))
2740 .unwrap(),
2741 (
2742 43,
2743 SnapshotHash(Hash::default()),
2744 ArchiveFormat::TarZstd {
2745 config: ZstdConfig::default(),
2746 }
2747 )
2748 );
2749 assert_eq!(
2750 parse_full_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default()))
2751 .unwrap(),
2752 (44, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2753 );
2754 assert_eq!(
2755 parse_full_snapshot_archive_filename(&format!(
2756 "snapshot-45-{}.tar.lz4",
2757 Hash::default()
2758 ))
2759 .unwrap(),
2760 (45, SnapshotHash(Hash::default()), ArchiveFormat::TarLz4)
2761 );
2762
2763 assert!(parse_full_snapshot_archive_filename("invalid").is_err());
2764 assert!(
2765 parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err()
2766 );
2767
2768 assert!(
2769 parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err()
2770 );
2771 assert!(parse_full_snapshot_archive_filename(&format!(
2772 "snapshot-12345678-{}.bad!ext",
2773 Hash::new_unique()
2774 ))
2775 .is_err());
2776 assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2777
2778 assert!(parse_full_snapshot_archive_filename(&format!(
2779 "snapshot-bad!slot-{}.bad!ext",
2780 Hash::new_unique()
2781 ))
2782 .is_err());
2783 assert!(parse_full_snapshot_archive_filename(&format!(
2784 "snapshot-12345678-{}.bad!ext",
2785 Hash::new_unique()
2786 ))
2787 .is_err());
2788 assert!(parse_full_snapshot_archive_filename(&format!(
2789 "snapshot-bad!slot-{}.tar",
2790 Hash::new_unique()
2791 ))
2792 .is_err());
2793
2794 assert!(parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_err());
2795 assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2796 assert!(parse_full_snapshot_archive_filename(&format!(
2797 "snapshot-bad!slot-{}.tar",
2798 Hash::new_unique()
2799 ))
2800 .is_err());
2801 }
2802
2803 #[test]
2804 fn test_parse_incremental_snapshot_archive_filename() {
2805 assert_eq!(
2806 parse_incremental_snapshot_archive_filename(&format!(
2807 "incremental-snapshot-42-123-{}.tar.bz2",
2808 Hash::default()
2809 ))
2810 .unwrap(),
2811 (
2812 42,
2813 123,
2814 SnapshotHash(Hash::default()),
2815 ArchiveFormat::TarBzip2
2816 )
2817 );
2818 assert_eq!(
2819 parse_incremental_snapshot_archive_filename(&format!(
2820 "incremental-snapshot-43-234-{}.tar.zst",
2821 Hash::default()
2822 ))
2823 .unwrap(),
2824 (
2825 43,
2826 234,
2827 SnapshotHash(Hash::default()),
2828 ArchiveFormat::TarZstd {
2829 config: ZstdConfig::default(),
2830 }
2831 )
2832 );
2833 assert_eq!(
2834 parse_incremental_snapshot_archive_filename(&format!(
2835 "incremental-snapshot-44-345-{}.tar",
2836 Hash::default()
2837 ))
2838 .unwrap(),
2839 (44, 345, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2840 );
2841 assert_eq!(
2842 parse_incremental_snapshot_archive_filename(&format!(
2843 "incremental-snapshot-45-456-{}.tar.lz4",
2844 Hash::default()
2845 ))
2846 .unwrap(),
2847 (
2848 45,
2849 456,
2850 SnapshotHash(Hash::default()),
2851 ArchiveFormat::TarLz4
2852 )
2853 );
2854
2855 assert!(parse_incremental_snapshot_archive_filename("invalid").is_err());
2856 assert!(parse_incremental_snapshot_archive_filename(&format!(
2857 "snapshot-42-{}.tar",
2858 Hash::new_unique()
2859 ))
2860 .is_err());
2861 assert!(parse_incremental_snapshot_archive_filename(
2862 "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext"
2863 )
2864 .is_err());
2865
2866 assert!(parse_incremental_snapshot_archive_filename(&format!(
2867 "incremental-snapshot-bad!slot-56785678-{}.tar",
2868 Hash::new_unique()
2869 ))
2870 .is_err());
2871
2872 assert!(parse_incremental_snapshot_archive_filename(&format!(
2873 "incremental-snapshot-12345678-bad!slot-{}.tar",
2874 Hash::new_unique()
2875 ))
2876 .is_err());
2877
2878 assert!(parse_incremental_snapshot_archive_filename(
2879 "incremental-snapshot-12341234-56785678-bad!HASH.tar"
2880 )
2881 .is_err());
2882
2883 assert!(parse_incremental_snapshot_archive_filename(&format!(
2884 "incremental-snapshot-12341234-56785678-{}.bad!ext",
2885 Hash::new_unique()
2886 ))
2887 .is_err());
2888 }
2889
2890 #[test]
2891 fn test_check_are_snapshots_compatible() {
2892 let slot1: Slot = 1234;
2893 let slot2: Slot = 5678;
2894 let slot3: Slot = 999_999;
2895
2896 let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
2897 format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()),
2898 ))
2899 .unwrap();
2900
2901 assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
2902
2903 let incremental_snapshot_archive_info =
2904 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2905 "/dir/incremental-snapshot-{}-{}-{}.tar",
2906 slot1,
2907 slot2,
2908 Hash::new_unique()
2909 )))
2910 .unwrap();
2911
2912 assert!(check_are_snapshots_compatible(
2913 &full_snapshot_archive_info,
2914 Some(&incremental_snapshot_archive_info)
2915 )
2916 .is_ok());
2917
2918 let incremental_snapshot_archive_info =
2919 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2920 "/dir/incremental-snapshot-{}-{}-{}.tar",
2921 slot2,
2922 slot3,
2923 Hash::new_unique()
2924 )))
2925 .unwrap();
2926
2927 assert!(check_are_snapshots_compatible(
2928 &full_snapshot_archive_info,
2929 Some(&incremental_snapshot_archive_info)
2930 )
2931 .is_err());
2932 }
2933
2934 fn common_create_bank_snapshot_files(
2936 bank_snapshots_dir: &Path,
2937 min_slot: Slot,
2938 max_slot: Slot,
2939 ) {
2940 for slot in min_slot..max_slot {
2941 let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
2942 fs::create_dir_all(&snapshot_dir).unwrap();
2943
2944 let snapshot_filename = get_snapshot_file_name(slot);
2945 let snapshot_path = snapshot_dir.join(snapshot_filename);
2946 fs::File::create(snapshot_path).unwrap();
2947
2948 let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2949 fs::File::create(status_cache_file).unwrap();
2950
2951 let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
2952 fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
2953
2954 let state_complete_path = snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2956 fs::File::create(state_complete_path).unwrap();
2957 }
2958 }
2959
2960 #[test]
2961 fn test_get_bank_snapshots() {
2962 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2963 let min_slot = 10;
2964 let max_slot = 20;
2965 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2966
2967 let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
2968 assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
2969 }
2970
2971 #[test]
2972 fn test_get_highest_bank_snapshot_post() {
2973 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2974 let min_slot = 99;
2975 let max_slot = 123;
2976 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2977
2978 let highest_bank_snapshot = get_highest_bank_snapshot_post(temp_snapshots_dir.path());
2979 assert!(highest_bank_snapshot.is_some());
2980 assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
2981 }
2982
2983 fn common_create_snapshot_archive_files(
2989 full_snapshot_archives_dir: &Path,
2990 incremental_snapshot_archives_dir: &Path,
2991 min_full_snapshot_slot: Slot,
2992 max_full_snapshot_slot: Slot,
2993 min_incremental_snapshot_slot: Slot,
2994 max_incremental_snapshot_slot: Slot,
2995 ) {
2996 fs::create_dir_all(full_snapshot_archives_dir).unwrap();
2997 fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
2998 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
2999 for incremental_snapshot_slot in
3000 min_incremental_snapshot_slot..max_incremental_snapshot_slot
3001 {
3002 let snapshot_filename = format!(
3003 "incremental-snapshot-{}-{}-{}.tar",
3004 full_snapshot_slot,
3005 incremental_snapshot_slot,
3006 Hash::default()
3007 );
3008 let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
3009 fs::File::create(snapshot_filepath).unwrap();
3010 }
3011
3012 let snapshot_filename =
3013 format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3014 let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
3015 fs::File::create(snapshot_filepath).unwrap();
3016
3017 let bad_filename = format!(
3019 "incremental-snapshot-{}-{}-bad!hash.tar",
3020 full_snapshot_slot,
3021 max_incremental_snapshot_slot + 1,
3022 );
3023 let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
3024 fs::File::create(bad_filepath).unwrap();
3025 }
3026
3027 let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1);
3030 let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
3031 fs::File::create(bad_filepath).unwrap();
3032 }
3033
3034 #[test]
3035 fn test_get_full_snapshot_archives() {
3036 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3037 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3038 let min_slot = 123;
3039 let max_slot = 456;
3040 common_create_snapshot_archive_files(
3041 full_snapshot_archives_dir.path(),
3042 incremental_snapshot_archives_dir.path(),
3043 min_slot,
3044 max_slot,
3045 0,
3046 0,
3047 );
3048
3049 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3050 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3051 }
3052
3053 #[test]
3054 fn test_get_full_snapshot_archives_remote() {
3055 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3056 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3057 let min_slot = 123;
3058 let max_slot = 456;
3059 common_create_snapshot_archive_files(
3060 &full_snapshot_archives_dir
3061 .path()
3062 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3063 &incremental_snapshot_archives_dir
3064 .path()
3065 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3066 min_slot,
3067 max_slot,
3068 0,
3069 0,
3070 );
3071
3072 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3073 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3074 assert!(snapshot_archives.iter().all(|info| info.is_remote()));
3075 }
3076
3077 #[test]
3078 fn test_get_incremental_snapshot_archives() {
3079 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3080 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3081 let min_full_snapshot_slot = 12;
3082 let max_full_snapshot_slot = 23;
3083 let min_incremental_snapshot_slot = 34;
3084 let max_incremental_snapshot_slot = 45;
3085 common_create_snapshot_archive_files(
3086 full_snapshot_archives_dir.path(),
3087 incremental_snapshot_archives_dir.path(),
3088 min_full_snapshot_slot,
3089 max_full_snapshot_slot,
3090 min_incremental_snapshot_slot,
3091 max_incremental_snapshot_slot,
3092 );
3093
3094 let incremental_snapshot_archives =
3095 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3096 assert_eq!(
3097 incremental_snapshot_archives.len() as Slot,
3098 (max_full_snapshot_slot - min_full_snapshot_slot)
3099 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3100 );
3101 }
3102
3103 #[test]
3104 fn test_get_incremental_snapshot_archives_remote() {
3105 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3106 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3107 let min_full_snapshot_slot = 12;
3108 let max_full_snapshot_slot = 23;
3109 let min_incremental_snapshot_slot = 34;
3110 let max_incremental_snapshot_slot = 45;
3111 common_create_snapshot_archive_files(
3112 &full_snapshot_archives_dir
3113 .path()
3114 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3115 &incremental_snapshot_archives_dir
3116 .path()
3117 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3118 min_full_snapshot_slot,
3119 max_full_snapshot_slot,
3120 min_incremental_snapshot_slot,
3121 max_incremental_snapshot_slot,
3122 );
3123
3124 let incremental_snapshot_archives =
3125 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3126 assert_eq!(
3127 incremental_snapshot_archives.len() as Slot,
3128 (max_full_snapshot_slot - min_full_snapshot_slot)
3129 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3130 );
3131 assert!(incremental_snapshot_archives
3132 .iter()
3133 .all(|info| info.is_remote()));
3134 }
3135
3136 #[test]
3137 fn test_get_highest_full_snapshot_archive_slot() {
3138 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3139 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3140 let min_slot = 123;
3141 let max_slot = 456;
3142 common_create_snapshot_archive_files(
3143 full_snapshot_archives_dir.path(),
3144 incremental_snapshot_archives_dir.path(),
3145 min_slot,
3146 max_slot,
3147 0,
3148 0,
3149 );
3150
3151 assert_eq!(
3152 get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
3153 Some(max_slot - 1)
3154 );
3155 }
3156
3157 #[test]
3158 fn test_get_highest_incremental_snapshot_slot() {
3159 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3160 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3161 let min_full_snapshot_slot = 12;
3162 let max_full_snapshot_slot = 23;
3163 let min_incremental_snapshot_slot = 34;
3164 let max_incremental_snapshot_slot = 45;
3165 common_create_snapshot_archive_files(
3166 full_snapshot_archives_dir.path(),
3167 incremental_snapshot_archives_dir.path(),
3168 min_full_snapshot_slot,
3169 max_full_snapshot_slot,
3170 min_incremental_snapshot_slot,
3171 max_incremental_snapshot_slot,
3172 );
3173
3174 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3175 assert_eq!(
3176 get_highest_incremental_snapshot_archive_slot(
3177 incremental_snapshot_archives_dir.path(),
3178 full_snapshot_slot
3179 ),
3180 Some(max_incremental_snapshot_slot - 1)
3181 );
3182 }
3183
3184 assert_eq!(
3185 get_highest_incremental_snapshot_archive_slot(
3186 incremental_snapshot_archives_dir.path(),
3187 max_full_snapshot_slot
3188 ),
3189 None
3190 );
3191 }
3192
3193 fn common_test_purge_old_snapshot_archives(
3194 snapshot_names: &[&String],
3195 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
3196 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
3197 expected_snapshots: &[&String],
3198 ) {
3199 let temp_snap_dir = tempfile::TempDir::new().unwrap();
3200
3201 for snap_name in snapshot_names {
3202 let snap_path = temp_snap_dir.path().join(snap_name);
3203 let mut _snap_file = fs::File::create(snap_path);
3204 }
3205 purge_old_snapshot_archives(
3206 temp_snap_dir.path(),
3207 temp_snap_dir.path(),
3208 maximum_full_snapshot_archives_to_retain,
3209 maximum_incremental_snapshot_archives_to_retain,
3210 );
3211
3212 let mut retained_snaps = HashSet::new();
3213 for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
3214 let entry_path_buf = entry.unwrap().path();
3215 let entry_path = entry_path_buf.as_path();
3216 let snapshot_name = entry_path
3217 .file_name()
3218 .unwrap()
3219 .to_str()
3220 .unwrap()
3221 .to_string();
3222 retained_snaps.insert(snapshot_name);
3223 }
3224
3225 for snap_name in expected_snapshots {
3226 assert!(
3227 retained_snaps.contains(snap_name.as_str()),
3228 "{snap_name} not found"
3229 );
3230 }
3231 assert_eq!(retained_snaps.len(), expected_snapshots.len());
3232 }
3233
3234 #[test]
3235 fn test_purge_old_full_snapshot_archives() {
3236 let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
3237 let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
3238 let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
3239 let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
3240
3241 let expected_snapshots = vec![&snap3_name];
3243 common_test_purge_old_snapshot_archives(
3244 &snapshot_names,
3245 NonZeroUsize::new(1).unwrap(),
3246 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3247 &expected_snapshots,
3248 );
3249
3250 let expected_snapshots = vec![&snap2_name, &snap3_name];
3252 common_test_purge_old_snapshot_archives(
3253 &snapshot_names,
3254 NonZeroUsize::new(2).unwrap(),
3255 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3256 &expected_snapshots,
3257 );
3258
3259 let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
3261 common_test_purge_old_snapshot_archives(
3262 &snapshot_names,
3263 NonZeroUsize::new(3).unwrap(),
3264 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3265 &expected_snapshots,
3266 );
3267 }
3268
3269 #[test]
3273 fn test_purge_old_full_snapshot_archives_in_the_loop() {
3274 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3275 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3276 let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
3277 let starting_slot: Slot = 42;
3278
3279 for slot in (starting_slot..).take(100) {
3280 let full_snapshot_archive_file_name =
3281 format!("snapshot-{}-{}.tar", slot, Hash::default());
3282 let full_snapshot_archive_path = full_snapshot_archives_dir
3283 .as_ref()
3284 .join(full_snapshot_archive_file_name);
3285 fs::File::create(full_snapshot_archive_path).unwrap();
3286
3287 if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
3289 continue;
3290 }
3291
3292 if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
3294 continue;
3295 }
3296
3297 purge_old_snapshot_archives(
3298 &full_snapshot_archives_dir,
3299 &incremental_snapshot_archives_dir,
3300 maximum_snapshots_to_retain,
3301 NonZeroUsize::new(usize::MAX).unwrap(),
3302 );
3303 let mut full_snapshot_archives =
3304 get_full_snapshot_archives(&full_snapshot_archives_dir);
3305 full_snapshot_archives.sort_unstable();
3306 assert_eq!(
3307 full_snapshot_archives.len(),
3308 maximum_snapshots_to_retain.get()
3309 );
3310 assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
3311 for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
3312 assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
3313 }
3314 }
3315 }
3316
3317 #[test]
3318 fn test_purge_old_incremental_snapshot_archives() {
3319 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3320 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3321 let starting_slot = 100_000;
3322
3323 let maximum_incremental_snapshot_archives_to_retain =
3324 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3325 let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3326
3327 let incremental_snapshot_interval = 100;
3328 let num_incremental_snapshots_per_full_snapshot =
3329 maximum_incremental_snapshot_archives_to_retain.get() * 2;
3330 let full_snapshot_interval =
3331 incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
3332
3333 let mut snapshot_filenames = vec![];
3334 (starting_slot..)
3335 .step_by(full_snapshot_interval)
3336 .take(
3337 maximum_full_snapshot_archives_to_retain
3338 .checked_mul(NonZeroUsize::new(2).unwrap())
3339 .unwrap()
3340 .get(),
3341 )
3342 .for_each(|full_snapshot_slot| {
3343 let snapshot_filename =
3344 format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3345 let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
3346 fs::File::create(snapshot_path).unwrap();
3347 snapshot_filenames.push(snapshot_filename);
3348
3349 (full_snapshot_slot..)
3350 .step_by(incremental_snapshot_interval)
3351 .take(num_incremental_snapshots_per_full_snapshot)
3352 .skip(1)
3353 .for_each(|incremental_snapshot_slot| {
3354 let snapshot_filename = format!(
3355 "incremental-snapshot-{}-{}-{}.tar",
3356 full_snapshot_slot,
3357 incremental_snapshot_slot,
3358 Hash::default()
3359 );
3360 let snapshot_path = incremental_snapshot_archives_dir
3361 .path()
3362 .join(&snapshot_filename);
3363 fs::File::create(snapshot_path).unwrap();
3364 snapshot_filenames.push(snapshot_filename);
3365 });
3366 });
3367
3368 purge_old_snapshot_archives(
3369 full_snapshot_archives_dir.path(),
3370 incremental_snapshot_archives_dir.path(),
3371 maximum_full_snapshot_archives_to_retain,
3372 maximum_incremental_snapshot_archives_to_retain,
3373 );
3374
3375 let mut remaining_full_snapshot_archives =
3377 get_full_snapshot_archives(full_snapshot_archives_dir.path());
3378 assert_eq!(
3379 remaining_full_snapshot_archives.len(),
3380 maximum_full_snapshot_archives_to_retain.get(),
3381 );
3382 remaining_full_snapshot_archives.sort_unstable();
3383 let latest_full_snapshot_archive_slot =
3384 remaining_full_snapshot_archives.last().unwrap().slot();
3385
3386 let mut remaining_incremental_snapshot_archives =
3391 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3392 assert_eq!(
3393 remaining_incremental_snapshot_archives.len(),
3394 maximum_incremental_snapshot_archives_to_retain
3395 .get()
3396 .saturating_add(
3397 maximum_full_snapshot_archives_to_retain
3398 .get()
3399 .saturating_sub(1)
3400 )
3401 );
3402 remaining_incremental_snapshot_archives.sort_unstable();
3403 remaining_incremental_snapshot_archives.reverse();
3404
3405 for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
3407 let incremental_snapshot_archive =
3408 remaining_incremental_snapshot_archives.pop().unwrap();
3409
3410 let expected_base_slot =
3411 latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
3412 assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
3413 let expected_slot = expected_base_slot
3414 + (full_snapshot_interval - incremental_snapshot_interval) as u64;
3415 assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
3416 }
3417
3418 for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
3420 assert_eq!(
3421 incremental_snapshot_archive.base_slot(),
3422 latest_full_snapshot_archive_slot
3423 );
3424 }
3425
3426 let expected_remaining_incremental_snapshot_archive_slots =
3428 (latest_full_snapshot_archive_slot..)
3429 .step_by(incremental_snapshot_interval)
3430 .take(num_incremental_snapshots_per_full_snapshot)
3431 .skip(
3432 num_incremental_snapshots_per_full_snapshot
3433 - maximum_incremental_snapshot_archives_to_retain.get(),
3434 )
3435 .collect::<HashSet<_>>();
3436
3437 let actual_remaining_incremental_snapshot_archive_slots =
3438 remaining_incremental_snapshot_archives
3439 .iter()
3440 .map(|snapshot| snapshot.slot())
3441 .collect::<HashSet<_>>();
3442 assert_eq!(
3443 actual_remaining_incremental_snapshot_archive_slots,
3444 expected_remaining_incremental_snapshot_archive_slots
3445 );
3446 }
3447
3448 #[test]
3449 fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
3450 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3451 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3452
3453 for snapshot_filenames in [
3454 format!("incremental-snapshot-100-120-{}.tar", Hash::default()),
3455 format!("incremental-snapshot-100-140-{}.tar", Hash::default()),
3456 format!("incremental-snapshot-100-160-{}.tar", Hash::default()),
3457 format!("incremental-snapshot-100-180-{}.tar", Hash::default()),
3458 format!("incremental-snapshot-200-220-{}.tar", Hash::default()),
3459 format!("incremental-snapshot-200-240-{}.tar", Hash::default()),
3460 format!("incremental-snapshot-200-260-{}.tar", Hash::default()),
3461 format!("incremental-snapshot-200-280-{}.tar", Hash::default()),
3462 ] {
3463 let snapshot_path = incremental_snapshot_archives_dir
3464 .path()
3465 .join(snapshot_filenames);
3466 fs::File::create(snapshot_path).unwrap();
3467 }
3468
3469 purge_old_snapshot_archives(
3470 full_snapshot_archives_dir.path(),
3471 incremental_snapshot_archives_dir.path(),
3472 NonZeroUsize::new(usize::MAX).unwrap(),
3473 NonZeroUsize::new(usize::MAX).unwrap(),
3474 );
3475
3476 let remaining_incremental_snapshot_archives =
3477 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3478 assert!(remaining_incremental_snapshot_archives.is_empty());
3479 }
3480
3481 #[test]
3482 fn test_get_snapshot_accounts_hardlink_dir() {
3483 let slot: Slot = 1;
3484
3485 let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
3486
3487 let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
3488 let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
3489 let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
3490 fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
3491
3492 let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
3493 let appendvec_filename = format!("{slot}.0");
3494 let appendvec_path = accounts_dir.join(appendvec_filename);
3495
3496 let ret = get_snapshot_accounts_hardlink_dir(
3497 &appendvec_path,
3498 slot,
3499 &mut account_paths_set,
3500 &accounts_hardlinks_dir,
3501 );
3502 assert!(ret.is_ok());
3503
3504 let wrong_appendvec_path = appendvec_path
3505 .parent()
3506 .unwrap()
3507 .parent()
3508 .unwrap()
3509 .join(appendvec_path.file_name().unwrap());
3510 let ret = get_snapshot_accounts_hardlink_dir(
3511 &wrong_appendvec_path,
3512 slot,
3513 &mut account_paths_set,
3514 accounts_hardlinks_dir,
3515 );
3516
3517 assert_matches!(
3518 ret,
3519 Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
3520 );
3521 }
3522
3523 #[test]
3524 fn test_full_snapshot_slot_file_good() {
3525 let slot_written = 123_456_789;
3526 let bank_snapshot_dir = TempDir::new().unwrap();
3527 write_full_snapshot_slot_file(&bank_snapshot_dir, slot_written).unwrap();
3528
3529 let slot_read = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap();
3530 assert_eq!(slot_read, slot_written);
3531 }
3532
3533 #[test]
3534 fn test_full_snapshot_slot_file_bad() {
3535 const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
3536 let too_small = [1u8; SLOT_SIZE - 1];
3537 let too_large = [1u8; SLOT_SIZE + 1];
3538
3539 for contents in [too_small.as_slice(), too_large.as_slice()] {
3540 let bank_snapshot_dir = TempDir::new().unwrap();
3541 let full_snapshot_slot_path = bank_snapshot_dir
3542 .as_ref()
3543 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
3544 fs::write(full_snapshot_slot_path, contents).unwrap();
3545
3546 let err = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap_err();
3547 assert!(err
3548 .to_string()
3549 .starts_with("invalid full snapshot slot file size"));
3550 }
3551 }
3552}