1use {
2 crate::{
3 bank::{BankFieldsToSerialize, BankSlotDelta},
4 serde_snapshot::{
5 self, BankIncrementalSnapshotPersistence, ExtraFieldsToSerialize, SnapshotStreams,
6 },
7 snapshot_archive_info::{
8 FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
9 SnapshotArchiveInfoGetter,
10 },
11 snapshot_bank_utils,
12 snapshot_config::SnapshotConfig,
13 snapshot_hash::SnapshotHash,
14 snapshot_package::{SnapshotKind, SnapshotPackage},
15 snapshot_utils::snapshot_storage_rebuilder::{
16 RebuiltSnapshotStorage, SnapshotStorageRebuilder,
17 },
18 },
19 bzip2::bufread::BzDecoder,
20 crossbeam_channel::Sender,
21 flate2::read::GzDecoder,
22 lazy_static::lazy_static,
23 log::*,
24 regex::Regex,
25 solana_accounts_db::{
26 account_storage::{meta::StoredMetaWriteVersion, AccountStorageMap},
27 accounts_db::{stats::BankHashStats, AccountStorageEntry, AtomicAccountsFileId},
28 accounts_file::{AccountsFile, AccountsFileError, InternalsForArchive, StorageAccess},
29 accounts_hash::{AccountsDeltaHash, AccountsHash},
30 epoch_accounts_hash::EpochAccountsHash,
31 hardened_unpack::{self, ParallelSelector, UnpackError},
32 shared_buffer_reader::{SharedBuffer, SharedBufferReader},
33 utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
34 },
35 solana_measure::{measure::Measure, measure_time, measure_us},
36 solana_sdk::{
37 clock::{Epoch, Slot},
38 hash::Hash,
39 },
40 std::{
41 cmp::Ordering,
42 collections::{HashMap, HashSet},
43 fmt, fs,
44 io::{BufReader, BufWriter, Error as IoError, Read, Result as IoResult, Seek, Write},
45 mem,
46 num::NonZeroUsize,
47 ops::RangeInclusive,
48 path::{Path, PathBuf},
49 process::ExitStatus,
50 str::FromStr,
51 sync::Arc,
52 thread::{Builder, JoinHandle},
53 },
54 tar::{self, Archive},
55 tempfile::TempDir,
56 thiserror::Error,
57};
58#[cfg(feature = "dev-context-only-utils")]
59use {
60 hardened_unpack::UnpackedAppendVecMap, rayon::prelude::*,
61 solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
62};
63
64mod archive_format;
65pub mod snapshot_storage_rebuilder;
66pub use archive_format::*;
67
68pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
69pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
70pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
71pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
72pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
73pub const SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME: &str = "full_snapshot_slot";
74pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; const VERSION_STRING_V1_2_0: &str = "1.2.0";
77pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
78pub const BANK_SNAPSHOT_PRE_FILENAME_EXTENSION: &str = "pre";
79pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
84 unsafe { NonZeroUsize::new_unchecked(2) };
85pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
86 unsafe { NonZeroUsize::new_unchecked(4) };
87pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
88pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
89
90#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
91pub enum SnapshotVersion {
92 #[default]
93 V1_2_0,
94}
95
96impl fmt::Display for SnapshotVersion {
97 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
98 f.write_str(From::from(*self))
99 }
100}
101
102impl From<SnapshotVersion> for &'static str {
103 fn from(snapshot_version: SnapshotVersion) -> &'static str {
104 match snapshot_version {
105 SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
106 }
107 }
108}
109
110impl FromStr for SnapshotVersion {
111 type Err = &'static str;
112
113 fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
114 let version_string = if version_string
116 .get(..1)
117 .map_or(false, |s| s.eq_ignore_ascii_case("v"))
118 {
119 &version_string[1..]
120 } else {
121 version_string
122 };
123 match version_string {
124 VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
125 _ => Err("unsupported snapshot version"),
126 }
127 }
128}
129
130impl SnapshotVersion {
131 pub fn as_str(self) -> &'static str {
132 <&str as From<Self>>::from(self)
133 }
134}
135
136#[derive(PartialEq, Eq, Debug)]
139pub struct BankSnapshotInfo {
140 pub slot: Slot,
142 pub snapshot_kind: BankSnapshotKind,
144 pub snapshot_dir: PathBuf,
146 pub snapshot_version: SnapshotVersion,
148}
149
150impl PartialOrd for BankSnapshotInfo {
151 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
152 Some(self.cmp(other))
153 }
154}
155
156impl Ord for BankSnapshotInfo {
158 fn cmp(&self, other: &Self) -> Ordering {
159 self.slot.cmp(&other.slot)
160 }
161}
162
163impl BankSnapshotInfo {
164 pub fn new_from_dir(
165 bank_snapshots_dir: impl AsRef<Path>,
166 slot: Slot,
167 ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
168 let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
171
172 if !bank_snapshot_dir.is_dir() {
173 return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
174 bank_snapshot_dir,
175 ));
176 }
177
178 if !is_bank_snapshot_complete(&bank_snapshot_dir) {
184 return Err(SnapshotNewFromDirError::IncompleteDir(bank_snapshot_dir));
185 }
186
187 let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
188 if !status_cache_file.is_file() {
189 return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
190 status_cache_file,
191 ));
192 }
193
194 let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
195 let version_str = snapshot_version_from_file(&version_path).or(Err(
196 SnapshotNewFromDirError::MissingVersionFile(version_path),
197 ))?;
198 let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
199 .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
200
201 let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
202 let bank_snapshot_pre_path =
203 bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
204
205 let snapshot_kind = if bank_snapshot_pre_path.is_file() {
214 BankSnapshotKind::Pre
215 } else if bank_snapshot_post_path.is_file() {
216 BankSnapshotKind::Post
217 } else {
218 return Err(SnapshotNewFromDirError::MissingSnapshotFile(
219 bank_snapshot_dir,
220 ));
221 };
222
223 Ok(BankSnapshotInfo {
224 slot,
225 snapshot_kind,
226 snapshot_dir: bank_snapshot_dir,
227 snapshot_version,
228 })
229 }
230
231 pub fn snapshot_path(&self) -> PathBuf {
232 let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
233
234 let ext = match self.snapshot_kind {
235 BankSnapshotKind::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
236 BankSnapshotKind::Post => "",
237 };
238 bank_snapshot_path.set_extension(ext);
239
240 bank_snapshot_path
241 }
242}
243
244#[derive(Debug, Copy, Clone, Eq, PartialEq)]
253pub enum BankSnapshotKind {
254 Pre,
256 Post,
258}
259
260#[derive(Clone, Copy, Debug, Eq, PartialEq)]
264pub enum SnapshotFrom {
265 Archive,
267 Dir,
269}
270
271#[derive(Debug)]
274pub struct SnapshotRootPaths {
275 pub full_snapshot_root_file_path: PathBuf,
276 pub incremental_snapshot_root_file_path: Option<PathBuf>,
277}
278
279#[derive(Debug)]
281pub struct UnarchivedSnapshot {
282 #[allow(dead_code)]
283 unpack_dir: TempDir,
284 pub storage: AccountStorageMap,
285 pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
286 pub measure_untar: Measure,
287}
288
289#[derive(Debug)]
291pub struct UnpackedSnapshotsDirAndVersion {
292 pub unpacked_snapshots_dir: PathBuf,
293 pub snapshot_version: SnapshotVersion,
294}
295
296pub(crate) struct StorageAndNextAccountsFileId {
299 pub storage: AccountStorageMap,
300 pub next_append_vec_id: AtomicAccountsFileId,
301}
302
303#[derive(Error, Debug)]
304#[allow(clippy::large_enum_variant)]
305pub enum SnapshotError {
306 #[error("I/O error: {0}")]
307 Io(#[from] IoError),
308
309 #[error("AccountsFile error: {0}")]
310 AccountsFileError(#[from] AccountsFileError),
311
312 #[error("serialization error: {0}")]
313 Serialize(#[from] bincode::Error),
314
315 #[error("crossbeam send error: {0}")]
316 CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
317
318 #[error("archive generation failure {0}")]
319 ArchiveGenerationFailure(ExitStatus),
320
321 #[error("Unpack error: {0}")]
322 UnpackError(#[from] UnpackError),
323
324 #[error("source({1}) - I/O error: {0}")]
325 IoWithSource(IoError, &'static str),
326
327 #[error("could not get file name from path '{0}'")]
328 PathToFileNameError(PathBuf),
329
330 #[error("could not get str from file name '{0}'")]
331 FileNameToStrError(PathBuf),
332
333 #[error("could not parse snapshot archive's file name '{0}'")]
334 ParseSnapshotArchiveFileNameError(String),
335
336 #[error("snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot ({1}) do not match")]
337 MismatchedBaseSlot(Slot, Slot),
338
339 #[error("no snapshot archives to load from '{0}'")]
340 NoSnapshotArchives(PathBuf),
341
342 #[error("snapshot has mismatch: deserialized bank: {0:?}, snapshot archive info: {1:?}")]
343 MismatchedSlotHash((Slot, SnapshotHash), (Slot, SnapshotHash)),
344
345 #[error("snapshot slot deltas are invalid: {0}")]
346 VerifySlotDeltas(#[from] VerifySlotDeltasError),
347
348 #[error("snapshot epoch stakes are invalid: {0}")]
349 VerifyEpochStakes(#[from] VerifyEpochStakesError),
350
351 #[error("bank_snapshot_info new_from_dir failed: {0}")]
352 NewFromDir(#[from] SnapshotNewFromDirError),
353
354 #[error("invalid snapshot dir path '{0}'")]
355 InvalidSnapshotDirPath(PathBuf),
356
357 #[error("invalid AppendVec path '{0}'")]
358 InvalidAppendVecPath(PathBuf),
359
360 #[error("invalid account path '{0}'")]
361 InvalidAccountPath(PathBuf),
362
363 #[error("no valid snapshot dir found under '{0}'")]
364 NoSnapshotSlotDir(PathBuf),
365
366 #[error("snapshot dir account paths mismatching")]
367 AccountPathsMismatch,
368
369 #[error("failed to add bank snapshot for slot {1}: {0}")]
370 AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
371
372 #[error("failed to archive snapshot package: {0}")]
373 ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
374
375 #[error("failed to rebuild snapshot storages: {0}")]
376 RebuildStorages(String),
377}
378
379#[derive(Error, Debug)]
380pub enum SnapshotNewFromDirError {
381 #[error("invalid bank snapshot directory '{0}'")]
382 InvalidBankSnapshotDir(PathBuf),
383
384 #[error("missing status cache file '{0}'")]
385 MissingStatusCacheFile(PathBuf),
386
387 #[error("missing version file '{0}'")]
388 MissingVersionFile(PathBuf),
389
390 #[error("invalid snapshot version '{0}'")]
391 InvalidVersion(String),
392
393 #[error("snapshot directory incomplete '{0}'")]
394 IncompleteDir(PathBuf),
395
396 #[error("missing snapshot file '{0}'")]
397 MissingSnapshotFile(PathBuf),
398}
399
400pub type Result<T> = std::result::Result<T, SnapshotError>;
401
402#[derive(Error, Debug, PartialEq, Eq)]
404pub enum VerifySlotDeltasError {
405 #[error("too many entries: {0} (max: {1})")]
406 TooManyEntries(usize, usize),
407
408 #[error("slot {0} is not a root")]
409 SlotIsNotRoot(Slot),
410
411 #[error("slot {0} is greater than bank slot {1}")]
412 SlotGreaterThanMaxRoot(Slot, Slot),
413
414 #[error("slot {0} has multiple entries")]
415 SlotHasMultipleEntries(Slot),
416
417 #[error("slot {0} was not found in slot history")]
418 SlotNotFoundInHistory(Slot),
419
420 #[error("slot {0} was in history but missing from slot deltas")]
421 SlotNotFoundInDeltas(Slot),
422
423 #[error("slot history is bad and cannot be used to verify slot deltas")]
424 BadSlotHistory,
425}
426
427#[derive(Error, Debug, PartialEq, Eq)]
429pub enum VerifyEpochStakesError {
430 #[error("epoch {0} is greater than the max {1}")]
431 EpochGreaterThanMax(Epoch, Epoch),
432
433 #[error("stakes not found for epoch {0} (required epochs: {1:?})")]
434 StakesNotFound(Epoch, RangeInclusive<Epoch>),
435}
436
437#[derive(Error, Debug)]
439pub enum AddBankSnapshotError {
440 #[error("bank snapshot dir already exists '{0}'")]
441 SnapshotDirAlreadyExists(PathBuf),
442
443 #[error("failed to create snapshot dir '{1}': {0}")]
444 CreateSnapshotDir(#[source] IoError, PathBuf),
445
446 #[error("failed to flush storage '{1}': {0}")]
447 FlushStorage(#[source] AccountsFileError, PathBuf),
448
449 #[error("failed to hard link storages: {0}")]
450 HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
451
452 #[error("failed to serialize bank: {0}")]
453 SerializeBank(#[source] Box<SnapshotError>),
454
455 #[error("failed to serialize status cache: {0}")]
456 SerializeStatusCache(#[source] Box<SnapshotError>),
457
458 #[error("failed to write snapshot version file '{1}': {0}")]
459 WriteSnapshotVersionFile(#[source] IoError, PathBuf),
460
461 #[error("failed to mark snapshot as 'complete': failed to create file '{1}': {0}")]
462 CreateStateCompleteFile(#[source] IoError, PathBuf),
463}
464
465#[derive(Error, Debug)]
467pub enum ArchiveSnapshotPackageError {
468 #[error("failed to create archive path '{1}': {0}")]
469 CreateArchiveDir(#[source] IoError, PathBuf),
470
471 #[error("failed to create staging dir inside '{1}': {0}")]
472 CreateStagingDir(#[source] IoError, PathBuf),
473
474 #[error("failed to create snapshot staging dir '{1}': {0}")]
475 CreateSnapshotStagingDir(#[source] IoError, PathBuf),
476
477 #[error("failed to canonicalize snapshot source dir '{1}': {0}")]
478 CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
479
480 #[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
481 SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
482
483 #[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
484 SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
485
486 #[error("failed to symlink version file from '{1}' to '{2}': {0}")]
487 SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
488
489 #[error("failed to create archive file '{1}': {0}")]
490 CreateArchiveFile(#[source] IoError, PathBuf),
491
492 #[error("failed to archive version file: {0}")]
493 ArchiveVersionFile(#[source] IoError),
494
495 #[error("failed to archive snapshots dir: {0}")]
496 ArchiveSnapshotsDir(#[source] IoError),
497
498 #[error("failed to archive account storage file '{1}': {0}")]
499 ArchiveAccountStorageFile(#[source] IoError, PathBuf),
500
501 #[error("failed to archive snapshot: {0}")]
502 FinishArchive(#[source] IoError),
503
504 #[error("failed to create encoder: {0}")]
505 CreateEncoder(#[source] IoError),
506
507 #[error("failed to encode archive: {0}")]
508 FinishEncoder(#[source] IoError),
509
510 #[error("failed to query archive metadata '{1}': {0}")]
511 QueryArchiveMetadata(#[source] IoError, PathBuf),
512
513 #[error("failed to move archive from '{1}' to '{2}': {0}")]
514 MoveArchive(#[source] IoError, PathBuf, PathBuf),
515}
516
517#[derive(Error, Debug)]
519pub enum HardLinkStoragesToSnapshotError {
520 #[error("failed to create accounts hard links dir '{1}': {0}")]
521 CreateAccountsHardLinksDir(#[source] IoError, PathBuf),
522
523 #[error("failed to get the snapshot's accounts hard link dir: {0}")]
524 GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
525
526 #[error("failed to hard link storage from '{1}' to '{2}': {0}")]
527 HardLinkStorage(#[source] IoError, PathBuf, PathBuf),
528}
529
530#[derive(Error, Debug)]
532pub enum GetSnapshotAccountsHardLinkDirError {
533 #[error("invalid account storage path '{0}'")]
534 GetAccountPath(PathBuf),
535
536 #[error("failed to create the snapshot hard link dir '{1}': {0}")]
537 CreateSnapshotHardLinkDir(#[source] IoError, PathBuf),
538
539 #[error("failed to symlink snapshot hard link dir '{link}' to '{original}': {source}")]
540 SymlinkSnapshotHardLinkDir {
541 source: IoError,
542 original: PathBuf,
543 link: PathBuf,
544 },
545}
546
547pub fn clean_orphaned_account_snapshot_dirs(
555 bank_snapshots_dir: impl AsRef<Path>,
556 account_snapshot_paths: &[PathBuf],
557) -> IoResult<()> {
558 let mut account_snapshot_dirs_referenced = HashSet::new();
561 let snapshots = get_bank_snapshots(bank_snapshots_dir);
562 for snapshot in snapshots {
563 let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
564 let read_dir = fs::read_dir(&account_hardlinks_dir).map_err(|err| {
566 IoError::other(format!(
567 "failed to read account hardlinks dir '{}': {err}",
568 account_hardlinks_dir.display(),
569 ))
570 })?;
571 for entry in read_dir {
572 let path = entry?.path();
573 let target = fs::read_link(&path).map_err(|err| {
574 IoError::other(format!(
575 "failed to read symlink '{}': {err}",
576 path.display(),
577 ))
578 })?;
579 account_snapshot_dirs_referenced.insert(target);
580 }
581 }
582
583 for account_snapshot_path in account_snapshot_paths {
585 let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
586 IoError::other(format!(
587 "failed to read account snapshot dir '{}': {err}",
588 account_snapshot_path.display(),
589 ))
590 })?;
591 for entry in read_dir {
592 let path = entry?.path();
593 if !account_snapshot_dirs_referenced.contains(&path) {
594 info!(
595 "Removing orphaned account snapshot hardlink directory '{}'...",
596 path.display()
597 );
598 move_and_async_delete_path(&path);
599 }
600 }
601 }
602
603 Ok(())
604}
605
606pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
608 let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
609 return;
611 };
612
613 let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
614
615 let incomplete_dirs: Vec<_> = read_dir_iter
616 .filter_map(|entry| entry.ok())
617 .map(|entry| entry.path())
618 .filter(|path| path.is_dir())
619 .filter(is_incomplete)
620 .collect();
621
622 for incomplete_dir in incomplete_dirs {
624 let result = purge_bank_snapshot(&incomplete_dir);
625 match result {
626 Ok(_) => info!(
627 "Purged incomplete snapshot dir: {}",
628 incomplete_dir.display()
629 ),
630 Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
631 }
632 }
633}
634
635fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
637 let state_complete_path = bank_snapshot_dir
638 .as_ref()
639 .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
640 state_complete_path.is_file()
641}
642
643pub fn write_full_snapshot_slot_file(
645 bank_snapshot_dir: impl AsRef<Path>,
646 full_snapshot_slot: Slot,
647) -> IoResult<()> {
648 let full_snapshot_slot_path = bank_snapshot_dir
649 .as_ref()
650 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
651 fs::write(
652 &full_snapshot_slot_path,
653 Slot::to_le_bytes(full_snapshot_slot),
654 )
655 .map_err(|err| {
656 IoError::other(format!(
657 "failed to write full snapshot slot file '{}': {err}",
658 full_snapshot_slot_path.display(),
659 ))
660 })
661}
662
663pub fn read_full_snapshot_slot_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<Slot> {
665 const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
666 let full_snapshot_slot_path = bank_snapshot_dir
667 .as_ref()
668 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
669 let full_snapshot_slot_file_metadata = fs::metadata(&full_snapshot_slot_path)?;
670 if full_snapshot_slot_file_metadata.len() != SLOT_SIZE as u64 {
671 let error_message = format!(
672 "invalid full snapshot slot file size: '{}' has {} bytes (should be {} bytes)",
673 full_snapshot_slot_path.display(),
674 full_snapshot_slot_file_metadata.len(),
675 SLOT_SIZE,
676 );
677 return Err(IoError::other(error_message));
678 }
679 let mut full_snapshot_slot_file = fs::File::open(&full_snapshot_slot_path)?;
680 let mut buffer = [0; SLOT_SIZE];
681 full_snapshot_slot_file.read_exact(&mut buffer)?;
682 let slot = Slot::from_le_bytes(buffer);
683 Ok(slot)
684}
685
686pub fn get_highest_loadable_bank_snapshot(
693 snapshot_config: &SnapshotConfig,
694) -> Option<BankSnapshotInfo> {
695 let highest_bank_snapshot =
696 get_highest_bank_snapshot_post(&snapshot_config.bank_snapshots_dir)?;
697
698 if !snapshot_config.should_generate_snapshots() {
701 return Some(highest_bank_snapshot);
702 }
703
704 let highest_full_snapshot_archive_slot =
707 get_highest_full_snapshot_archive_slot(&snapshot_config.full_snapshot_archives_dir)?;
708 let full_snapshot_file_slot =
709 read_full_snapshot_slot_file(&highest_bank_snapshot.snapshot_dir).ok()?;
710 (full_snapshot_file_slot == highest_full_snapshot_archive_slot).then_some(highest_bank_snapshot)
711}
712
713pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
716 if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
717 for entry in entries.flatten() {
718 if entry
719 .file_name()
720 .to_str()
721 .map(|file_name| file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX))
722 .unwrap_or(false)
723 {
724 let path = entry.path();
725 let result = if path.is_dir() {
726 fs::remove_dir_all(&path)
727 } else {
728 fs::remove_file(&path)
729 };
730 if let Err(err) = result {
731 warn!(
732 "Failed to remove temporary snapshot archive '{}': {err}",
733 path.display(),
734 );
735 }
736 }
737 }
738 }
739}
740
741pub fn serialize_and_archive_snapshot_package(
743 snapshot_package: SnapshotPackage,
744 snapshot_config: &SnapshotConfig,
745) -> Result<SnapshotArchiveInfo> {
746 let SnapshotPackage {
747 snapshot_kind,
748 slot: snapshot_slot,
749 block_height,
750 hash: snapshot_hash,
751 mut snapshot_storages,
752 status_cache_slot_deltas,
753 bank_fields_to_serialize,
754 bank_hash_stats,
755 accounts_delta_hash,
756 accounts_hash,
757 epoch_accounts_hash,
758 bank_incremental_snapshot_persistence,
759 write_version,
760 enqueued: _,
761 } = snapshot_package;
762
763 let bank_snapshot_info = serialize_snapshot(
764 &snapshot_config.bank_snapshots_dir,
765 snapshot_config.snapshot_version,
766 snapshot_storages.as_slice(),
767 status_cache_slot_deltas.as_slice(),
768 bank_fields_to_serialize,
769 bank_hash_stats,
770 accounts_delta_hash,
771 accounts_hash,
772 epoch_accounts_hash,
773 bank_incremental_snapshot_persistence.as_ref(),
774 write_version,
775 )?;
776
777 let full_snapshot_archive_slot = match snapshot_kind {
779 SnapshotKind::FullSnapshot => snapshot_slot,
780 SnapshotKind::IncrementalSnapshot(base_slot) => base_slot,
781 };
782 write_full_snapshot_slot_file(&bank_snapshot_info.snapshot_dir, full_snapshot_archive_slot)
783 .map_err(|err| {
784 IoError::other(format!(
785 "failed to serialize snapshot slot {snapshot_slot}, block height {block_height}, kind {snapshot_kind:?}: {err}",
786 ))
787 })?;
788
789 let snapshot_archive_path = match snapshot_package.snapshot_kind {
790 SnapshotKind::FullSnapshot => build_full_snapshot_archive_path(
791 &snapshot_config.full_snapshot_archives_dir,
792 snapshot_package.slot,
793 &snapshot_package.hash,
794 snapshot_config.archive_format,
795 ),
796 SnapshotKind::IncrementalSnapshot(incremental_snapshot_base_slot) => {
797 snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
800 build_incremental_snapshot_archive_path(
801 &snapshot_config.incremental_snapshot_archives_dir,
802 incremental_snapshot_base_slot,
803 snapshot_package.slot,
804 &snapshot_package.hash,
805 snapshot_config.archive_format,
806 )
807 }
808 };
809
810 let snapshot_archive_info = archive_snapshot(
811 snapshot_kind,
812 snapshot_slot,
813 snapshot_hash,
814 snapshot_storages.as_slice(),
815 &bank_snapshot_info.snapshot_dir,
816 snapshot_archive_path,
817 snapshot_config.archive_format,
818 )?;
819
820 Ok(snapshot_archive_info)
821}
822
823#[allow(clippy::too_many_arguments)]
825fn serialize_snapshot(
826 bank_snapshots_dir: impl AsRef<Path>,
827 snapshot_version: SnapshotVersion,
828 snapshot_storages: &[Arc<AccountStorageEntry>],
829 slot_deltas: &[BankSlotDelta],
830 mut bank_fields: BankFieldsToSerialize,
831 bank_hash_stats: BankHashStats,
832 accounts_delta_hash: AccountsDeltaHash,
833 accounts_hash: AccountsHash,
834 epoch_accounts_hash: Option<EpochAccountsHash>,
835 bank_incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
836 write_version: StoredMetaWriteVersion,
837) -> Result<BankSnapshotInfo> {
838 let slot = bank_fields.slot;
839
840 let do_serialize_snapshot = || {
843 let mut measure_everything = Measure::start("");
844 let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
845 if bank_snapshot_dir.exists() {
846 return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
847 bank_snapshot_dir,
848 ));
849 }
850 fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
851 AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
852 })?;
853
854 let bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
856 info!(
857 "Creating bank snapshot for slot {slot} at '{}'",
858 bank_snapshot_path.display(),
859 );
860
861 let (_, flush_storages_us) = measure_us!({
862 for storage in snapshot_storages {
863 storage.flush().map_err(|err| {
864 AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
865 })?;
866 }
867 });
868
869 let (_, hard_link_storages_us) = measure_us!(hard_link_storages_to_snapshot(
874 &bank_snapshot_dir,
875 slot,
876 snapshot_storages
877 )
878 .map_err(AddBankSnapshotError::HardLinkStorages)?);
879
880 let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
881 let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
882 let extra_fields = ExtraFieldsToSerialize {
883 lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
884 incremental_snapshot_persistence: bank_incremental_snapshot_persistence,
885 epoch_accounts_hash,
886 versioned_epoch_stakes,
887 };
888 serde_snapshot::serialize_bank_snapshot_into(
889 stream,
890 bank_fields,
891 bank_hash_stats,
892 accounts_delta_hash,
893 accounts_hash,
894 &get_storages_to_serialize(snapshot_storages),
895 extra_fields,
896 write_version,
897 )?;
898 Ok(())
899 };
900 let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
901 serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
902 .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
903 "bank serialize"
904 );
905
906 let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
907 let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
908 snapshot_bank_utils::serialize_status_cache(slot_deltas, &status_cache_path)
909 .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
910 );
911
912 let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
913 let (_, write_version_file_us) = measure_us!(fs::write(
914 &version_path,
915 snapshot_version.as_str().as_bytes(),
916 )
917 .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
918
919 let state_complete_path = bank_snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
921 let (_, write_state_complete_file_us) = measure_us!(fs::File::create(&state_complete_path)
922 .map_err(|err| {
923 AddBankSnapshotError::CreateStateCompleteFile(err, state_complete_path)
924 })?);
925
926 measure_everything.stop();
927
928 datapoint_info!(
930 "snapshot_bank",
931 ("slot", slot, i64),
932 ("bank_size", bank_snapshot_consumed_size, i64),
933 ("status_cache_size", status_cache_consumed_size, i64),
934 ("flush_storages_us", flush_storages_us, i64),
935 ("hard_link_storages_us", hard_link_storages_us, i64),
936 ("bank_serialize_us", bank_serialize.as_us(), i64),
937 ("status_cache_serialize_us", status_cache_serialize_us, i64),
938 ("write_version_file_us", write_version_file_us, i64),
939 (
940 "write_state_complete_file_us",
941 write_state_complete_file_us,
942 i64
943 ),
944 ("total_us", measure_everything.as_us(), i64),
945 );
946
947 info!(
948 "{} for slot {} at {}",
949 bank_serialize,
950 slot,
951 bank_snapshot_path.display(),
952 );
953
954 Ok(BankSnapshotInfo {
955 slot,
956 snapshot_kind: BankSnapshotKind::Pre,
957 snapshot_dir: bank_snapshot_dir,
958 snapshot_version,
959 })
960 };
961
962 do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
963}
964
965fn archive_snapshot(
967 snapshot_kind: SnapshotKind,
968 snapshot_slot: Slot,
969 snapshot_hash: SnapshotHash,
970 snapshot_storages: &[Arc<AccountStorageEntry>],
971 bank_snapshot_dir: impl AsRef<Path>,
972 archive_path: impl AsRef<Path>,
973 archive_format: ArchiveFormat,
974) -> Result<SnapshotArchiveInfo> {
975 use ArchiveSnapshotPackageError as E;
976 const SNAPSHOTS_DIR: &str = "snapshots";
977 const ACCOUNTS_DIR: &str = "accounts";
978 info!("Generating snapshot archive for slot {snapshot_slot}, kind: {snapshot_kind:?}");
979
980 let mut timer = Measure::start("snapshot_package-package_snapshots");
981 let tar_dir = archive_path
982 .as_ref()
983 .parent()
984 .expect("Tar output path is invalid");
985
986 fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
987
988 let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
990 let staging_dir = tempfile::Builder::new()
991 .prefix(&format!("{}{}-", staging_dir_prefix, snapshot_slot))
992 .tempdir_in(tar_dir)
993 .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
994 let staging_snapshots_dir = staging_dir.path().join(SNAPSHOTS_DIR);
995
996 let slot_str = snapshot_slot.to_string();
997 let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
998 fs::create_dir_all(&staging_snapshot_dir)
1000 .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
1001
1002 let src_snapshot_dir = bank_snapshot_dir.as_ref().canonicalize().map_err(|err| {
1004 E::CanonicalizeSnapshotSourceDir(err, bank_snapshot_dir.as_ref().to_path_buf())
1005 })?;
1006 let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
1007 let src_snapshot_file = src_snapshot_dir.join(slot_str);
1008 symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
1009 .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
1010
1011 let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1014 let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1015 symlink::symlink_file(&src_status_cache, &staging_status_cache)
1016 .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
1017
1018 let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
1020 let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1021 symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
1022 E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
1023 })?;
1024
1025 let staging_archive_path = tar_dir.join(format!(
1027 "{}{}.{}",
1028 staging_dir_prefix,
1029 snapshot_slot,
1030 archive_format.extension(),
1031 ));
1032
1033 {
1034 let mut archive_file = fs::File::create(&staging_archive_path)
1035 .map_err(|err| E::CreateArchiveFile(err, staging_archive_path.clone()))?;
1036
1037 let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
1038 let mut archive = tar::Builder::new(encoder);
1039 archive
1042 .append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
1043 .map_err(E::ArchiveVersionFile)?;
1044 archive
1045 .append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
1046 .map_err(E::ArchiveSnapshotsDir)?;
1047
1048 for storage in snapshot_storages {
1049 let path_in_archive = Path::new(ACCOUNTS_DIR)
1050 .join(AccountsFile::file_name(storage.slot(), storage.id()));
1051 match storage.accounts.internals_for_archive() {
1052 InternalsForArchive::Mmap(data) => {
1053 let mut header = tar::Header::new_gnu();
1054 header.set_path(path_in_archive).map_err(|err| {
1055 E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1056 })?;
1057 header.set_size(storage.capacity());
1058 header.set_cksum();
1059 archive.append(&header, data)
1060 }
1061 InternalsForArchive::FileIo(path) => {
1062 archive.append_path_with_name(path, path_in_archive)
1063 }
1064 }
1065 .map_err(|err| E::ArchiveAccountStorageFile(err, storage.path().to_path_buf()))?;
1066 }
1067
1068 archive.into_inner().map_err(E::FinishArchive)?;
1069 Ok(())
1070 };
1071
1072 match archive_format {
1073 ArchiveFormat::TarBzip2 => {
1074 let mut encoder =
1075 bzip2::write::BzEncoder::new(archive_file, bzip2::Compression::best());
1076 do_archive_files(&mut encoder)?;
1077 encoder.finish().map_err(E::FinishEncoder)?;
1078 }
1079 ArchiveFormat::TarGzip => {
1080 let mut encoder =
1081 flate2::write::GzEncoder::new(archive_file, flate2::Compression::default());
1082 do_archive_files(&mut encoder)?;
1083 encoder.finish().map_err(E::FinishEncoder)?;
1084 }
1085 ArchiveFormat::TarZstd => {
1086 let mut encoder =
1088 zstd::stream::Encoder::new(archive_file, 1).map_err(E::CreateEncoder)?;
1089 do_archive_files(&mut encoder)?;
1090 encoder.finish().map_err(E::FinishEncoder)?;
1091 }
1092 ArchiveFormat::TarLz4 => {
1093 let mut encoder = lz4::EncoderBuilder::new()
1094 .level(1)
1095 .build(archive_file)
1096 .map_err(E::CreateEncoder)?;
1097 do_archive_files(&mut encoder)?;
1098 let (_output, result) = encoder.finish();
1099 result.map_err(E::FinishEncoder)?;
1100 }
1101 ArchiveFormat::Tar => {
1102 do_archive_files(&mut archive_file)?;
1103 }
1104 };
1105 }
1106
1107 let metadata = fs::metadata(&staging_archive_path)
1109 .map_err(|err| E::QueryArchiveMetadata(err, staging_archive_path.clone()))?;
1110 let archive_path = archive_path.as_ref().to_path_buf();
1111 fs::rename(&staging_archive_path, &archive_path)
1112 .map_err(|err| E::MoveArchive(err, staging_archive_path, archive_path.clone()))?;
1113
1114 timer.stop();
1115 info!(
1116 "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
1117 archive_path.display(),
1118 snapshot_slot,
1119 timer.as_ms(),
1120 metadata.len()
1121 );
1122
1123 datapoint_info!(
1124 "archive-snapshot-package",
1125 ("slot", snapshot_slot, i64),
1126 ("archive_format", archive_format.to_string(), String),
1127 ("duration_ms", timer.as_ms(), i64),
1128 (
1129 if snapshot_kind.is_full_snapshot() {
1130 "full-snapshot-archive-size"
1131 } else {
1132 "incremental-snapshot-archive-size"
1133 },
1134 metadata.len(),
1135 i64
1136 ),
1137 );
1138 Ok(SnapshotArchiveInfo {
1139 path: archive_path,
1140 slot: snapshot_slot,
1141 hash: snapshot_hash,
1142 archive_format,
1143 })
1144}
1145
1146pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1148 let mut bank_snapshots = Vec::default();
1149 match fs::read_dir(&bank_snapshots_dir) {
1150 Err(err) => {
1151 info!(
1152 "Unable to read bank snapshots directory '{}': {err}",
1153 bank_snapshots_dir.as_ref().display(),
1154 );
1155 }
1156 Ok(paths) => paths
1157 .filter_map(|entry| {
1158 entry
1161 .ok()
1162 .filter(|entry| entry.path().is_dir())
1163 .and_then(|entry| {
1164 entry
1165 .path()
1166 .file_name()
1167 .and_then(|file_name| file_name.to_str())
1168 .and_then(|file_name| file_name.parse::<Slot>().ok())
1169 })
1170 })
1171 .for_each(
1172 |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
1173 Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
1174 Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
1177 },
1178 ),
1179 }
1180 bank_snapshots
1181}
1182
1183pub fn get_bank_snapshots_pre(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1187 let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1188 bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Pre);
1189 bank_snapshots
1190}
1191
1192pub fn get_bank_snapshots_post(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1196 let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1197 bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Post);
1198 bank_snapshots
1199}
1200
1201pub fn get_highest_bank_snapshot_pre(
1205 bank_snapshots_dir: impl AsRef<Path>,
1206) -> Option<BankSnapshotInfo> {
1207 do_get_highest_bank_snapshot(get_bank_snapshots_pre(bank_snapshots_dir))
1208}
1209
1210pub fn get_highest_bank_snapshot_post(
1214 bank_snapshots_dir: impl AsRef<Path>,
1215) -> Option<BankSnapshotInfo> {
1216 do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
1217}
1218
1219pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
1223 do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
1224}
1225
1226fn do_get_highest_bank_snapshot(
1227 mut bank_snapshots: Vec<BankSnapshotInfo>,
1228) -> Option<BankSnapshotInfo> {
1229 bank_snapshots.sort_unstable();
1230 bank_snapshots.into_iter().next_back()
1231}
1232
1233pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
1234where
1235 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1236{
1237 serialize_snapshot_data_file_capped::<F>(
1238 data_file_path,
1239 MAX_SNAPSHOT_DATA_FILE_SIZE,
1240 serializer,
1241 )
1242}
1243
1244pub fn deserialize_snapshot_data_file<T: Sized>(
1245 data_file_path: &Path,
1246 deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
1247) -> Result<T> {
1248 let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
1249 deserializer(streams.full_snapshot_stream)
1250 };
1251
1252 let wrapped_data_file_path = SnapshotRootPaths {
1253 full_snapshot_root_file_path: data_file_path.to_path_buf(),
1254 incremental_snapshot_root_file_path: None,
1255 };
1256
1257 deserialize_snapshot_data_files_capped(
1258 &wrapped_data_file_path,
1259 MAX_SNAPSHOT_DATA_FILE_SIZE,
1260 wrapped_deserializer,
1261 )
1262}
1263
1264pub fn deserialize_snapshot_data_files<T: Sized>(
1265 snapshot_root_paths: &SnapshotRootPaths,
1266 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1267) -> Result<T> {
1268 deserialize_snapshot_data_files_capped(
1269 snapshot_root_paths,
1270 MAX_SNAPSHOT_DATA_FILE_SIZE,
1271 deserializer,
1272 )
1273}
1274
1275fn serialize_snapshot_data_file_capped<F>(
1276 data_file_path: &Path,
1277 maximum_file_size: u64,
1278 serializer: F,
1279) -> Result<u64>
1280where
1281 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1282{
1283 let data_file = fs::File::create(data_file_path)?;
1284 let mut data_file_stream = BufWriter::new(data_file);
1285 serializer(&mut data_file_stream)?;
1286 data_file_stream.flush()?;
1287
1288 let consumed_size = data_file_stream.stream_position()?;
1289 if consumed_size > maximum_file_size {
1290 let error_message = format!(
1291 "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
1292 data_file_path.display(),
1293 );
1294 return Err(IoError::other(error_message).into());
1295 }
1296 Ok(consumed_size)
1297}
1298
1299fn deserialize_snapshot_data_files_capped<T: Sized>(
1300 snapshot_root_paths: &SnapshotRootPaths,
1301 maximum_file_size: u64,
1302 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1303) -> Result<T> {
1304 let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
1305 create_snapshot_data_file_stream(
1306 &snapshot_root_paths.full_snapshot_root_file_path,
1307 maximum_file_size,
1308 )?;
1309
1310 let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
1311 if let Some(ref incremental_snapshot_root_file_path) =
1312 snapshot_root_paths.incremental_snapshot_root_file_path
1313 {
1314 Some(create_snapshot_data_file_stream(
1315 incremental_snapshot_root_file_path,
1316 maximum_file_size,
1317 )?)
1318 } else {
1319 None
1320 }
1321 .unzip();
1322
1323 let mut snapshot_streams = SnapshotStreams {
1324 full_snapshot_stream: &mut full_snapshot_data_file_stream,
1325 incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
1326 };
1327 let ret = deserializer(&mut snapshot_streams)?;
1328
1329 check_deserialize_file_consumed(
1330 full_snapshot_file_size,
1331 &snapshot_root_paths.full_snapshot_root_file_path,
1332 &mut full_snapshot_data_file_stream,
1333 )?;
1334
1335 if let Some(ref incremental_snapshot_root_file_path) =
1336 snapshot_root_paths.incremental_snapshot_root_file_path
1337 {
1338 check_deserialize_file_consumed(
1339 incremental_snapshot_file_size.unwrap(),
1340 incremental_snapshot_root_file_path,
1341 incremental_snapshot_data_file_stream.as_mut().unwrap(),
1342 )?;
1343 }
1344
1345 Ok(ret)
1346}
1347
1348fn create_snapshot_data_file_stream(
1351 snapshot_root_file_path: impl AsRef<Path>,
1352 maximum_file_size: u64,
1353) -> Result<(u64, BufReader<std::fs::File>)> {
1354 let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
1355
1356 if snapshot_file_size > maximum_file_size {
1357 let error_message = format!(
1358 "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
1359 snapshot_root_file_path.as_ref().display(),
1360 snapshot_file_size,
1361 maximum_file_size,
1362 );
1363 return Err(IoError::other(error_message).into());
1364 }
1365
1366 let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
1367 let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
1368
1369 Ok((snapshot_file_size, snapshot_data_file_stream))
1370}
1371
1372fn check_deserialize_file_consumed(
1375 file_size: u64,
1376 file_path: impl AsRef<Path>,
1377 file_stream: &mut BufReader<std::fs::File>,
1378) -> Result<()> {
1379 let consumed_size = file_stream.stream_position()?;
1380
1381 if consumed_size != file_size {
1382 let error_message = format!(
1383 "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to deserialize",
1384 file_path.as_ref().display(),
1385 file_size,
1386 consumed_size,
1387 );
1388 return Err(IoError::other(error_message).into());
1389 }
1390
1391 Ok(())
1392}
1393
1394fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
1396 let run_path = appendvec_path.parent()?;
1397 let run_file_name = run_path.file_name()?;
1398 if run_file_name != ACCOUNTS_RUN_DIR {
1401 error!(
1402 "The account path {} does not have run/ as its immediate parent directory.",
1403 run_path.display()
1404 );
1405 return None;
1406 }
1407 let account_path = run_path.parent()?;
1408 Some(account_path.to_path_buf())
1409}
1410
1411fn get_snapshot_accounts_hardlink_dir(
1414 appendvec_path: &Path,
1415 bank_slot: Slot,
1416 account_paths: &mut HashSet<PathBuf>,
1417 hardlinks_dir: impl AsRef<Path>,
1418) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1419 let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1420 GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1421 })?;
1422
1423 let snapshot_hardlink_dir = account_path
1424 .join(ACCOUNTS_SNAPSHOT_DIR)
1425 .join(bank_slot.to_string());
1426
1427 if !account_paths.contains(&account_path) {
1430 let idx = account_paths.len();
1431 debug!(
1432 "for appendvec_path {}, create hard-link path {}",
1433 appendvec_path.display(),
1434 snapshot_hardlink_dir.display()
1435 );
1436 fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1437 GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1438 err,
1439 snapshot_hardlink_dir.clone(),
1440 )
1441 })?;
1442 let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1443 symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1444 GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1445 source: err,
1446 original: snapshot_hardlink_dir.clone(),
1447 link: symlink_path,
1448 }
1449 })?;
1450 account_paths.insert(account_path);
1451 };
1452
1453 Ok(snapshot_hardlink_dir)
1454}
1455
1456pub fn hard_link_storages_to_snapshot(
1461 bank_snapshot_dir: impl AsRef<Path>,
1462 bank_slot: Slot,
1463 snapshot_storages: &[Arc<AccountStorageEntry>],
1464) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1465 let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1466 fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1467 HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1468 err,
1469 accounts_hardlinks_dir.clone(),
1470 )
1471 })?;
1472
1473 let mut account_paths: HashSet<PathBuf> = HashSet::new();
1474 for storage in snapshot_storages {
1475 let storage_path = storage.accounts.path();
1476 let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1477 storage_path,
1478 bank_slot,
1479 &mut account_paths,
1480 &accounts_hardlinks_dir,
1481 )?;
1482 let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
1485 let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1486 fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
1487 HardLinkStoragesToSnapshotError::HardLinkStorage(
1488 err,
1489 storage_path.to_path_buf(),
1490 hard_link_path,
1491 )
1492 })?;
1493 }
1494 Ok(())
1495}
1496
1497pub(crate) fn get_storages_to_serialize(
1500 snapshot_storages: &[Arc<AccountStorageEntry>],
1501) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1502 snapshot_storages
1503 .iter()
1504 .map(|storage| vec![Arc::clone(storage)])
1505 .collect::<Vec<_>>()
1506}
1507
1508const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
1510
1511pub fn verify_and_unarchive_snapshots(
1513 bank_snapshots_dir: impl AsRef<Path>,
1514 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1515 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1516 account_paths: &[PathBuf],
1517 storage_access: StorageAccess,
1518) -> Result<(
1519 UnarchivedSnapshot,
1520 Option<UnarchivedSnapshot>,
1521 AtomicAccountsFileId,
1522)> {
1523 check_are_snapshots_compatible(
1524 full_snapshot_archive_info,
1525 incremental_snapshot_archive_info,
1526 )?;
1527
1528 let parallel_divisions = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT);
1529
1530 let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
1531 let unarchived_full_snapshot = unarchive_snapshot(
1532 &bank_snapshots_dir,
1533 TMP_SNAPSHOT_ARCHIVE_PREFIX,
1534 full_snapshot_archive_info.path(),
1535 "snapshot untar",
1536 account_paths,
1537 full_snapshot_archive_info.archive_format(),
1538 parallel_divisions,
1539 next_append_vec_id.clone(),
1540 storage_access,
1541 )?;
1542
1543 let unarchived_incremental_snapshot =
1544 if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1545 let unarchived_incremental_snapshot = unarchive_snapshot(
1546 &bank_snapshots_dir,
1547 TMP_SNAPSHOT_ARCHIVE_PREFIX,
1548 incremental_snapshot_archive_info.path(),
1549 "incremental snapshot untar",
1550 account_paths,
1551 incremental_snapshot_archive_info.archive_format(),
1552 parallel_divisions,
1553 next_append_vec_id.clone(),
1554 storage_access,
1555 )?;
1556 Some(unarchived_incremental_snapshot)
1557 } else {
1558 None
1559 };
1560
1561 Ok((
1562 unarchived_full_snapshot,
1563 unarchived_incremental_snapshot,
1564 Arc::try_unwrap(next_append_vec_id).unwrap(),
1565 ))
1566}
1567
1568fn spawn_unpack_snapshot_thread(
1570 file_sender: Sender<PathBuf>,
1571 account_paths: Arc<Vec<PathBuf>>,
1572 ledger_dir: Arc<PathBuf>,
1573 mut archive: Archive<SharedBufferReader>,
1574 parallel_selector: Option<ParallelSelector>,
1575 thread_index: usize,
1576) -> JoinHandle<()> {
1577 Builder::new()
1578 .name(format!("solUnpkSnpsht{thread_index:02}"))
1579 .spawn(move || {
1580 hardened_unpack::streaming_unpack_snapshot(
1581 &mut archive,
1582 ledger_dir.as_path(),
1583 &account_paths,
1584 parallel_selector,
1585 &file_sender,
1586 )
1587 .unwrap();
1588 })
1589 .unwrap()
1590}
1591
1592fn streaming_unarchive_snapshot(
1594 file_sender: Sender<PathBuf>,
1595 account_paths: Vec<PathBuf>,
1596 ledger_dir: PathBuf,
1597 snapshot_archive_path: PathBuf,
1598 archive_format: ArchiveFormat,
1599 num_threads: usize,
1600) -> Vec<JoinHandle<()>> {
1601 let account_paths = Arc::new(account_paths);
1602 let ledger_dir = Arc::new(ledger_dir);
1603 let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format);
1604
1605 let archives: Vec<_> = (0..num_threads)
1607 .map(|_| {
1608 let reader = SharedBufferReader::new(&shared_buffer);
1609 Archive::new(reader)
1610 })
1611 .collect();
1612
1613 archives
1614 .into_iter()
1615 .enumerate()
1616 .map(|(thread_index, archive)| {
1617 let parallel_selector = Some(ParallelSelector {
1618 index: thread_index,
1619 divisions: num_threads,
1620 });
1621
1622 spawn_unpack_snapshot_thread(
1623 file_sender.clone(),
1624 account_paths.clone(),
1625 ledger_dir.clone(),
1626 archive,
1627 parallel_selector,
1628 thread_index,
1629 )
1630 })
1631 .collect()
1632}
1633
1634fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1639 let snapshots_dir = unpack_dir.as_ref().join("snapshots");
1640 if !snapshots_dir.is_dir() {
1641 return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1642 }
1643
1644 let slot_dir = std::fs::read_dir(&snapshots_dir)
1646 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1647 .find(|entry| entry.as_ref().unwrap().path().is_dir())
1648 .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1649 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1650 .path();
1651
1652 let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
1653 fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
1654
1655 let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1656 fs::hard_link(
1657 status_cache_file,
1658 slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
1659 )?;
1660
1661 let state_complete_file = slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
1662 fs::File::create(state_complete_file)?;
1663
1664 Ok(())
1665}
1666
1667fn unarchive_snapshot(
1671 bank_snapshots_dir: impl AsRef<Path>,
1672 unpacked_snapshots_dir_prefix: &'static str,
1673 snapshot_archive_path: impl AsRef<Path>,
1674 measure_name: &'static str,
1675 account_paths: &[PathBuf],
1676 archive_format: ArchiveFormat,
1677 parallel_divisions: usize,
1678 next_append_vec_id: Arc<AtomicAccountsFileId>,
1679 storage_access: StorageAccess,
1680) -> Result<UnarchivedSnapshot> {
1681 let unpack_dir = tempfile::Builder::new()
1682 .prefix(unpacked_snapshots_dir_prefix)
1683 .tempdir_in(bank_snapshots_dir)?;
1684 let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
1685
1686 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1687 streaming_unarchive_snapshot(
1688 file_sender,
1689 account_paths.to_vec(),
1690 unpack_dir.path().to_path_buf(),
1691 snapshot_archive_path.as_ref().to_path_buf(),
1692 archive_format,
1693 parallel_divisions,
1694 );
1695
1696 let num_rebuilder_threads = num_cpus::get_physical()
1697 .saturating_sub(parallel_divisions)
1698 .max(1);
1699 let (version_and_storages, measure_untar) = measure_time!(
1700 SnapshotStorageRebuilder::rebuild_storage(
1701 file_receiver,
1702 num_rebuilder_threads,
1703 next_append_vec_id,
1704 SnapshotFrom::Archive,
1705 storage_access,
1706 )?,
1707 measure_name
1708 );
1709 info!("{}", measure_untar);
1710
1711 create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1712
1713 let RebuiltSnapshotStorage {
1714 snapshot_version,
1715 storage,
1716 } = version_and_storages;
1717 Ok(UnarchivedSnapshot {
1718 unpack_dir,
1719 storage,
1720 unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1721 unpacked_snapshots_dir,
1722 snapshot_version,
1723 },
1724 measure_untar,
1725 })
1726}
1727
1728fn streaming_snapshot_dir_files(
1731 file_sender: Sender<PathBuf>,
1732 snapshot_file_path: impl Into<PathBuf>,
1733 snapshot_version_path: impl Into<PathBuf>,
1734 account_paths: &[PathBuf],
1735) -> Result<()> {
1736 file_sender.send(snapshot_file_path.into())?;
1737 file_sender.send(snapshot_version_path.into())?;
1738
1739 for account_path in account_paths {
1740 for file in fs::read_dir(account_path)? {
1741 file_sender.send(file?.path())?;
1742 }
1743 }
1744
1745 Ok(())
1746}
1747
1748pub fn rebuild_storages_from_snapshot_dir(
1753 snapshot_info: &BankSnapshotInfo,
1754 account_paths: &[PathBuf],
1755 next_append_vec_id: Arc<AtomicAccountsFileId>,
1756 storage_access: StorageAccess,
1757) -> Result<AccountStorageMap> {
1758 let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1759 let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1760 let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1761
1762 let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1763 IoError::other(format!(
1764 "failed to read accounts hardlinks dir '{}': {err}",
1765 accounts_hardlinks.display(),
1766 ))
1767 })?;
1768 for dir_entry in read_dir {
1769 let symlink_path = dir_entry?.path();
1770 let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
1773 IoError::other(format!(
1774 "failed to read symlink '{}': {err}",
1775 symlink_path.display(),
1776 ))
1777 })?;
1778 let account_run_path = account_snapshot_path
1779 .parent()
1780 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1781 .parent()
1782 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1783 .join(ACCOUNTS_RUN_DIR);
1784 if !account_run_paths.contains(&account_run_path) {
1785 return Err(SnapshotError::AccountPathsMismatch);
1788 }
1789 let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
1792 IoError::other(format!(
1793 "failed to read account snapshot dir '{}': {err}",
1794 account_snapshot_path.display(),
1795 ))
1796 })?;
1797 for file in read_dir {
1798 let file_path = file?.path();
1799 let file_name = file_path
1800 .file_name()
1801 .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
1802 let dest_path = account_run_path.join(file_name);
1803 fs::hard_link(&file_path, &dest_path).map_err(|err| {
1804 IoError::other(format!(
1805 "failed to hard link from '{}' to '{}': {err}",
1806 file_path.display(),
1807 dest_path.display(),
1808 ))
1809 })?;
1810 }
1811 }
1812
1813 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1814 let snapshot_file_path = &snapshot_info.snapshot_path();
1815 let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1816 streaming_snapshot_dir_files(
1817 file_sender,
1818 snapshot_file_path,
1819 snapshot_version_path,
1820 account_paths,
1821 )?;
1822
1823 let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1824 let version_and_storages = SnapshotStorageRebuilder::rebuild_storage(
1825 file_receiver,
1826 num_rebuilder_threads,
1827 next_append_vec_id,
1828 SnapshotFrom::Dir,
1829 storage_access,
1830 )?;
1831
1832 let RebuiltSnapshotStorage {
1833 snapshot_version: _,
1834 storage,
1835 } = version_and_storages;
1836 Ok(storage)
1837}
1838
1839fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
1843 let file_metadata = fs::metadata(&path).map_err(|err| {
1845 IoError::other(format!(
1846 "failed to query snapshot version file metadata '{}': {err}",
1847 path.as_ref().display(),
1848 ))
1849 })?;
1850 let file_size = file_metadata.len();
1851 if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
1852 let error_message = format!(
1853 "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
1854 path.as_ref().display(),
1855 file_size,
1856 MAX_SNAPSHOT_VERSION_FILE_SIZE,
1857 );
1858 return Err(IoError::other(error_message).into());
1859 }
1860
1861 let mut snapshot_version = String::new();
1863 let mut file = fs::File::open(&path).map_err(|err| {
1864 IoError::other(format!(
1865 "failed to open snapshot version file '{}': {err}",
1866 path.as_ref().display()
1867 ))
1868 })?;
1869 file.read_to_string(&mut snapshot_version).map_err(|err| {
1870 IoError::other(format!(
1871 "failed to read snapshot version from file '{}': {err}",
1872 path.as_ref().display()
1873 ))
1874 })?;
1875
1876 Ok(snapshot_version.trim().to_string())
1877}
1878
1879fn check_are_snapshots_compatible(
1882 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1883 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1884) -> Result<()> {
1885 if incremental_snapshot_archive_info.is_none() {
1886 return Ok(());
1887 }
1888
1889 let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
1890
1891 (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
1892 .then_some(())
1893 .ok_or_else(|| {
1894 SnapshotError::MismatchedBaseSlot(
1895 full_snapshot_archive_info.slot(),
1896 incremental_snapshot_archive_info.base_slot(),
1897 )
1898 })
1899}
1900
1901pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
1903 path.file_name()
1904 .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))?
1905 .to_str()
1906 .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
1907}
1908
1909pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
1910 snapshot_archives_dir
1911 .as_ref()
1912 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
1913}
1914
1915pub fn build_full_snapshot_archive_path(
1918 full_snapshot_archives_dir: impl AsRef<Path>,
1919 slot: Slot,
1920 hash: &SnapshotHash,
1921 archive_format: ArchiveFormat,
1922) -> PathBuf {
1923 full_snapshot_archives_dir.as_ref().join(format!(
1924 "snapshot-{}-{}.{}",
1925 slot,
1926 hash.0,
1927 archive_format.extension(),
1928 ))
1929}
1930
1931pub fn build_incremental_snapshot_archive_path(
1935 incremental_snapshot_archives_dir: impl AsRef<Path>,
1936 base_slot: Slot,
1937 slot: Slot,
1938 hash: &SnapshotHash,
1939 archive_format: ArchiveFormat,
1940) -> PathBuf {
1941 incremental_snapshot_archives_dir.as_ref().join(format!(
1942 "incremental-snapshot-{}-{}-{}.{}",
1943 base_slot,
1944 slot,
1945 hash.0,
1946 archive_format.extension(),
1947 ))
1948}
1949
1950pub(crate) fn parse_full_snapshot_archive_filename(
1952 archive_filename: &str,
1953) -> Result<(Slot, SnapshotHash, ArchiveFormat)> {
1954 lazy_static! {
1955 static ref RE: Regex = Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1956 }
1957
1958 let do_parse = || {
1959 RE.captures(archive_filename).and_then(|captures| {
1960 let slot = captures
1961 .name("slot")
1962 .map(|x| x.as_str().parse::<Slot>())?
1963 .ok()?;
1964 let hash = captures
1965 .name("hash")
1966 .map(|x| x.as_str().parse::<Hash>())?
1967 .ok()?;
1968 let archive_format = captures
1969 .name("ext")
1970 .map(|x| x.as_str().parse::<ArchiveFormat>())?
1971 .ok()?;
1972
1973 Some((slot, SnapshotHash(hash), archive_format))
1974 })
1975 };
1976
1977 do_parse().ok_or_else(|| {
1978 SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
1979 })
1980}
1981
1982pub(crate) fn parse_incremental_snapshot_archive_filename(
1984 archive_filename: &str,
1985) -> Result<(Slot, Slot, SnapshotHash, ArchiveFormat)> {
1986 lazy_static! {
1987 static ref RE: Regex = Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1988 }
1989
1990 let do_parse = || {
1991 RE.captures(archive_filename).and_then(|captures| {
1992 let base_slot = captures
1993 .name("base")
1994 .map(|x| x.as_str().parse::<Slot>())?
1995 .ok()?;
1996 let slot = captures
1997 .name("slot")
1998 .map(|x| x.as_str().parse::<Slot>())?
1999 .ok()?;
2000 let hash = captures
2001 .name("hash")
2002 .map(|x| x.as_str().parse::<Hash>())?
2003 .ok()?;
2004 let archive_format = captures
2005 .name("ext")
2006 .map(|x| x.as_str().parse::<ArchiveFormat>())?
2007 .ok()?;
2008
2009 Some((base_slot, slot, SnapshotHash(hash), archive_format))
2010 })
2011 };
2012
2013 do_parse().ok_or_else(|| {
2014 SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2015 })
2016}
2017
2018fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
2020where
2021 F: Fn(PathBuf) -> Result<T>,
2022{
2023 let walk_dir = |dir: &Path| -> Vec<T> {
2024 let entry_iter = fs::read_dir(dir);
2025 match entry_iter {
2026 Err(err) => {
2027 info!(
2028 "Unable to read snapshot archives directory '{}': {err}",
2029 dir.display(),
2030 );
2031 vec![]
2032 }
2033 Ok(entries) => entries
2034 .filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
2035 .collect(),
2036 }
2037 };
2038
2039 let mut ret = walk_dir(snapshot_archives_dir);
2040 let remote_dir = build_snapshot_archives_remote_dir(snapshot_archives_dir);
2041 if remote_dir.exists() {
2042 ret.append(&mut walk_dir(remote_dir.as_ref()));
2043 }
2044 ret
2045}
2046
2047pub fn get_full_snapshot_archives(
2049 full_snapshot_archives_dir: impl AsRef<Path>,
2050) -> Vec<FullSnapshotArchiveInfo> {
2051 get_snapshot_archives(
2052 full_snapshot_archives_dir.as_ref(),
2053 FullSnapshotArchiveInfo::new_from_path,
2054 )
2055}
2056
2057pub fn get_incremental_snapshot_archives(
2059 incremental_snapshot_archives_dir: impl AsRef<Path>,
2060) -> Vec<IncrementalSnapshotArchiveInfo> {
2061 get_snapshot_archives(
2062 incremental_snapshot_archives_dir.as_ref(),
2063 IncrementalSnapshotArchiveInfo::new_from_path,
2064 )
2065}
2066
2067pub fn get_highest_full_snapshot_archive_slot(
2069 full_snapshot_archives_dir: impl AsRef<Path>,
2070) -> Option<Slot> {
2071 get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
2072 .map(|full_snapshot_archive_info| full_snapshot_archive_info.slot())
2073}
2074
2075pub fn get_highest_incremental_snapshot_archive_slot(
2078 incremental_snapshot_archives_dir: impl AsRef<Path>,
2079 full_snapshot_slot: Slot,
2080) -> Option<Slot> {
2081 get_highest_incremental_snapshot_archive_info(
2082 incremental_snapshot_archives_dir,
2083 full_snapshot_slot,
2084 )
2085 .map(|incremental_snapshot_archive_info| incremental_snapshot_archive_info.slot())
2086}
2087
2088pub fn get_highest_full_snapshot_archive_info(
2090 full_snapshot_archives_dir: impl AsRef<Path>,
2091) -> Option<FullSnapshotArchiveInfo> {
2092 let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2093 full_snapshot_archives.sort_unstable();
2094 full_snapshot_archives.into_iter().next_back()
2095}
2096
2097pub fn get_highest_incremental_snapshot_archive_info(
2100 incremental_snapshot_archives_dir: impl AsRef<Path>,
2101 full_snapshot_slot: Slot,
2102) -> Option<IncrementalSnapshotArchiveInfo> {
2103 let mut incremental_snapshot_archives =
2107 get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
2108 .into_iter()
2109 .filter(|incremental_snapshot_archive_info| {
2110 incremental_snapshot_archive_info.base_slot() == full_snapshot_slot
2111 })
2112 .collect::<Vec<_>>();
2113 incremental_snapshot_archives.sort_unstable();
2114 incremental_snapshot_archives.into_iter().next_back()
2115}
2116
2117pub fn purge_old_snapshot_archives(
2118 full_snapshot_archives_dir: impl AsRef<Path>,
2119 incremental_snapshot_archives_dir: impl AsRef<Path>,
2120 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2121 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2122) {
2123 info!(
2124 "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
2125 full_snapshot_archives_dir.as_ref().display(),
2126 maximum_full_snapshot_archives_to_retain
2127 );
2128
2129 let mut full_snapshot_archives = get_full_snapshot_archives(&full_snapshot_archives_dir);
2130 full_snapshot_archives.sort_unstable();
2131 full_snapshot_archives.reverse();
2132
2133 let num_to_retain = full_snapshot_archives
2134 .len()
2135 .min(maximum_full_snapshot_archives_to_retain.get());
2136 trace!(
2137 "There are {} full snapshot archives, retaining {}",
2138 full_snapshot_archives.len(),
2139 num_to_retain,
2140 );
2141
2142 let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
2143 if full_snapshot_archives.is_empty() {
2144 None
2145 } else {
2146 Some(full_snapshot_archives.split_at(num_to_retain))
2147 }
2148 .unwrap_or_default();
2149
2150 let retained_full_snapshot_slots = full_snapshot_archives_to_retain
2151 .iter()
2152 .map(|ai| ai.slot())
2153 .collect::<HashSet<_>>();
2154
2155 fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
2156 for path in archives.iter().map(|a| a.path()) {
2157 trace!("Removing snapshot archive: {}", path.display());
2158 let result = fs::remove_file(path);
2159 if let Err(err) = result {
2160 info!(
2161 "Failed to remove snapshot archive '{}': {err}",
2162 path.display()
2163 );
2164 }
2165 }
2166 }
2167 remove_archives(full_snapshot_archives_to_remove);
2168
2169 info!(
2170 "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
2171 incremental_snapshot_archives_dir.as_ref().display(),
2172 maximum_incremental_snapshot_archives_to_retain
2173 );
2174 let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
2175 for incremental_snapshot_archive in
2176 get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
2177 {
2178 incremental_snapshot_archives_by_base_slot
2179 .entry(incremental_snapshot_archive.base_slot())
2180 .or_default()
2181 .push(incremental_snapshot_archive)
2182 }
2183
2184 let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
2185 for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
2186 {
2187 incremental_snapshot_archives.sort_unstable();
2188 let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
2189 maximum_incremental_snapshot_archives_to_retain.get()
2190 } else {
2191 usize::from(retained_full_snapshot_slots.contains(&base_slot))
2192 };
2193 trace!(
2194 "There are {} incremental snapshot archives for base slot {}, removing {} of them",
2195 incremental_snapshot_archives.len(),
2196 base_slot,
2197 incremental_snapshot_archives
2198 .len()
2199 .saturating_sub(num_to_retain),
2200 );
2201
2202 incremental_snapshot_archives.truncate(
2203 incremental_snapshot_archives
2204 .len()
2205 .saturating_sub(num_to_retain),
2206 );
2207 remove_archives(&incremental_snapshot_archives);
2208 }
2209}
2210
2211#[cfg(feature = "dev-context-only-utils")]
2212fn unpack_snapshot_local(
2213 shared_buffer: SharedBuffer,
2214 ledger_dir: &Path,
2215 account_paths: &[PathBuf],
2216 parallel_divisions: usize,
2217) -> Result<UnpackedAppendVecMap> {
2218 assert!(parallel_divisions > 0);
2219
2220 let readers = (0..parallel_divisions)
2222 .map(|_| SharedBufferReader::new(&shared_buffer))
2223 .collect::<Vec<_>>();
2224
2225 let all_unpacked_append_vec_map = readers
2227 .into_par_iter()
2228 .enumerate()
2229 .map(|(index, reader)| {
2230 let parallel_selector = Some(ParallelSelector {
2231 index,
2232 divisions: parallel_divisions,
2233 });
2234 let mut archive = Archive::new(reader);
2235 hardened_unpack::unpack_snapshot(
2236 &mut archive,
2237 ledger_dir,
2238 account_paths,
2239 parallel_selector,
2240 )
2241 })
2242 .collect::<Vec<_>>();
2243
2244 let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
2245 for h in all_unpacked_append_vec_map {
2246 unpacked_append_vec_map.extend(h?);
2247 }
2248
2249 Ok(unpacked_append_vec_map)
2250}
2251
2252fn untar_snapshot_create_shared_buffer(
2253 snapshot_tar: &Path,
2254 archive_format: ArchiveFormat,
2255) -> SharedBuffer {
2256 let open_file = || {
2257 fs::File::open(snapshot_tar)
2258 .map_err(|err| {
2259 IoError::other(format!(
2260 "failed to open snapshot archive '{}': {err}",
2261 snapshot_tar.display(),
2262 ))
2263 })
2264 .unwrap()
2265 };
2266 match archive_format {
2267 ArchiveFormat::TarBzip2 => SharedBuffer::new(BzDecoder::new(BufReader::new(open_file()))),
2268 ArchiveFormat::TarGzip => SharedBuffer::new(GzDecoder::new(BufReader::new(open_file()))),
2269 ArchiveFormat::TarZstd => SharedBuffer::new(
2270 zstd::stream::read::Decoder::new(BufReader::new(open_file())).unwrap(),
2271 ),
2272 ArchiveFormat::TarLz4 => {
2273 SharedBuffer::new(lz4::Decoder::new(BufReader::new(open_file())).unwrap())
2274 }
2275 ArchiveFormat::Tar => SharedBuffer::new(BufReader::new(open_file())),
2276 }
2277}
2278
2279#[cfg(feature = "dev-context-only-utils")]
2280fn untar_snapshot_in(
2281 snapshot_tar: impl AsRef<Path>,
2282 unpack_dir: &Path,
2283 account_paths: &[PathBuf],
2284 archive_format: ArchiveFormat,
2285 parallel_divisions: usize,
2286) -> Result<UnpackedAppendVecMap> {
2287 let shared_buffer = untar_snapshot_create_shared_buffer(snapshot_tar.as_ref(), archive_format);
2288 unpack_snapshot_local(shared_buffer, unpack_dir, account_paths, parallel_divisions)
2289}
2290
2291pub fn verify_unpacked_snapshots_dir_and_version(
2292 unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
2293) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
2294 info!(
2295 "snapshot version: {}",
2296 &unpacked_snapshots_dir_and_version.snapshot_version
2297 );
2298
2299 let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
2300 let mut bank_snapshots =
2301 get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
2302 if bank_snapshots.len() > 1 {
2303 return Err(IoError::other(format!(
2304 "invalid snapshot format: only one snapshot allowed, but found {}",
2305 bank_snapshots.len(),
2306 ))
2307 .into());
2308 }
2309 let root_paths = bank_snapshots.pop().ok_or_else(|| {
2310 IoError::other(format!(
2311 "no snapshots found in snapshots directory '{}'",
2312 unpacked_snapshots_dir_and_version
2313 .unpacked_snapshots_dir
2314 .display(),
2315 ))
2316 })?;
2317 Ok((snapshot_version, root_paths))
2318}
2319
2320pub fn get_snapshot_file_name(slot: Slot) -> String {
2322 slot.to_string()
2323}
2324
2325pub fn get_bank_snapshot_dir(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) -> PathBuf {
2327 bank_snapshots_dir
2328 .as_ref()
2329 .join(get_snapshot_file_name(slot))
2330}
2331
2332#[derive(Debug, Copy, Clone)]
2333pub enum VerifyBank {
2335 Deterministic,
2337 NonDeterministic,
2340}
2341
2342#[cfg(feature = "dev-context-only-utils")]
2343pub fn verify_snapshot_archive(
2344 snapshot_archive: impl AsRef<Path>,
2345 snapshots_to_verify: impl AsRef<Path>,
2346 archive_format: ArchiveFormat,
2347 verify_bank: VerifyBank,
2348 slot: Slot,
2349) {
2350 let temp_dir = tempfile::TempDir::new().unwrap();
2351 let unpack_dir = temp_dir.path();
2352 let unpack_account_dir = create_accounts_run_and_snapshot_dirs(unpack_dir).unwrap().0;
2353 untar_snapshot_in(
2354 snapshot_archive,
2355 unpack_dir,
2356 &[unpack_account_dir.clone()],
2357 archive_format,
2358 1,
2359 )
2360 .unwrap();
2361
2362 let unpacked_snapshots = unpack_dir.join("snapshots");
2364
2365 let storages_to_verify = unpack_dir.join("storages_to_verify");
2368 fs::create_dir_all(&storages_to_verify).unwrap();
2370
2371 let slot = slot.to_string();
2372 let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
2373
2374 if let VerifyBank::NonDeterministic = verify_bank {
2375 let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
2377 let p2 = unpacked_snapshots.join(&slot).join(&slot);
2378 assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
2379 fs::remove_file(p1).unwrap();
2380 fs::remove_file(p2).unwrap();
2381 }
2382
2383 let existing_unpacked_status_cache_file =
2389 unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2390 let new_unpacked_status_cache_file = unpacked_snapshots
2391 .join(&slot)
2392 .join(SNAPSHOT_STATUS_CACHE_FILENAME);
2393 fs::rename(
2394 existing_unpacked_status_cache_file,
2395 new_unpacked_status_cache_file,
2396 )
2397 .unwrap();
2398
2399 let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2400 if accounts_hardlinks_dir.is_dir() {
2401 for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
2403 let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
2404 for entry in fs::read_dir(&link_dst_path).unwrap() {
2406 let src_path = entry.unwrap().path();
2407 let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
2408 fs::copy(src_path, dst_path).unwrap();
2409 }
2410 }
2411 fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
2412 }
2413
2414 let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
2415 if version_path.is_file() {
2416 fs::remove_file(version_path).unwrap();
2417 }
2418
2419 let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2420 if state_complete_path.is_file() {
2421 fs::remove_file(state_complete_path).unwrap();
2422 }
2423
2424 assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
2425
2426 _ = fs::remove_dir(unpack_account_dir.join("accounts"));
2432 assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
2434}
2435
2436pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
2438 let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2439 purge_bank_snapshots(&bank_snapshots);
2440}
2441
2442pub fn purge_old_bank_snapshots(
2444 bank_snapshots_dir: impl AsRef<Path>,
2445 num_bank_snapshots_to_retain: usize,
2446 filter_by_kind: Option<BankSnapshotKind>,
2447) {
2448 let mut bank_snapshots = match filter_by_kind {
2449 Some(BankSnapshotKind::Pre) => get_bank_snapshots_pre(&bank_snapshots_dir),
2450 Some(BankSnapshotKind::Post) => get_bank_snapshots_post(&bank_snapshots_dir),
2451 None => get_bank_snapshots(&bank_snapshots_dir),
2452 };
2453
2454 bank_snapshots.sort_unstable();
2455 purge_bank_snapshots(
2456 bank_snapshots
2457 .iter()
2458 .rev()
2459 .skip(num_bank_snapshots_to_retain),
2460 );
2461}
2462
2463pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
2468 purge_old_bank_snapshots(&bank_snapshots_dir, 0, Some(BankSnapshotKind::Pre));
2469 purge_old_bank_snapshots(&bank_snapshots_dir, 1, Some(BankSnapshotKind::Post));
2470
2471 let highest_bank_snapshot_post = get_highest_bank_snapshot_post(&bank_snapshots_dir);
2472 if let Some(highest_bank_snapshot_post) = highest_bank_snapshot_post {
2473 debug!(
2474 "Retained bank snapshot for slot {}, and purged the rest.",
2475 highest_bank_snapshot_post.slot
2476 );
2477 }
2478}
2479
2480pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
2482 let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2483 bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
2484 purge_bank_snapshots(&bank_snapshots);
2485}
2486
2487fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
2491 for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
2492 if purge_bank_snapshot(snapshot_dir).is_err() {
2493 warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
2494 }
2495 }
2496}
2497
2498pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
2500 const FN_ERR: &str = "failed to purge bank snapshot";
2501 let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2502 if accounts_hardlinks_dir.is_dir() {
2503 let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
2506 IoError::other(format!(
2507 "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
2508 accounts_hardlinks_dir.display(),
2509 ))
2510 })?;
2511 for entry in read_dir {
2512 let accounts_hardlink_dir = entry?.path();
2513 let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
2514 IoError::other(format!(
2515 "{FN_ERR}: failed to read symlink '{}': {err}",
2516 accounts_hardlink_dir.display(),
2517 ))
2518 })?;
2519 move_and_async_delete_path(&accounts_hardlink_dir);
2520 }
2521 }
2522 fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
2523 IoError::other(format!(
2524 "{FN_ERR}: failed to remove dir '{}': {err}",
2525 bank_snapshot_dir.as_ref().display(),
2526 ))
2527 })?;
2528 Ok(())
2529}
2530
2531pub fn should_take_full_snapshot(
2532 block_height: Slot,
2533 full_snapshot_archive_interval_slots: Slot,
2534) -> bool {
2535 block_height % full_snapshot_archive_interval_slots == 0
2536}
2537
2538pub fn should_take_incremental_snapshot(
2539 block_height: Slot,
2540 incremental_snapshot_archive_interval_slots: Slot,
2541 latest_full_snapshot_slot: Option<Slot>,
2542) -> bool {
2543 block_height % incremental_snapshot_archive_interval_slots == 0
2544 && latest_full_snapshot_slot.is_some()
2545}
2546
2547#[cfg(feature = "dev-context-only-utils")]
2552pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
2553 let tmp_dir = tempfile::TempDir::new().unwrap();
2554 let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
2555 (tmp_dir, account_dir)
2556}
2557
2558#[cfg(test)]
2559mod tests {
2560 use {
2561 super::*,
2562 assert_matches::assert_matches,
2563 bincode::{deserialize_from, serialize_into},
2564 std::{convert::TryFrom, mem::size_of},
2565 tempfile::NamedTempFile,
2566 };
2567
2568 #[test]
2569 fn test_serialize_snapshot_data_file_under_limit() {
2570 let temp_dir = tempfile::TempDir::new().unwrap();
2571 let expected_consumed_size = size_of::<u32>() as u64;
2572 let consumed_size = serialize_snapshot_data_file_capped(
2573 &temp_dir.path().join("data-file"),
2574 expected_consumed_size,
2575 |stream| {
2576 serialize_into(stream, &2323_u32)?;
2577 Ok(())
2578 },
2579 )
2580 .unwrap();
2581 assert_eq!(consumed_size, expected_consumed_size);
2582 }
2583
2584 #[test]
2585 fn test_serialize_snapshot_data_file_over_limit() {
2586 let temp_dir = tempfile::TempDir::new().unwrap();
2587 let expected_consumed_size = size_of::<u32>() as u64;
2588 let result = serialize_snapshot_data_file_capped(
2589 &temp_dir.path().join("data-file"),
2590 expected_consumed_size - 1,
2591 |stream| {
2592 serialize_into(stream, &2323_u32)?;
2593 Ok(())
2594 },
2595 );
2596 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
2597 }
2598
2599 #[test]
2600 fn test_deserialize_snapshot_data_file_under_limit() {
2601 let expected_data = 2323_u32;
2602 let expected_consumed_size = size_of::<u32>() as u64;
2603
2604 let temp_dir = tempfile::TempDir::new().unwrap();
2605 serialize_snapshot_data_file_capped(
2606 &temp_dir.path().join("data-file"),
2607 expected_consumed_size,
2608 |stream| {
2609 serialize_into(stream, &expected_data)?;
2610 Ok(())
2611 },
2612 )
2613 .unwrap();
2614
2615 let snapshot_root_paths = SnapshotRootPaths {
2616 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2617 incremental_snapshot_root_file_path: None,
2618 };
2619
2620 let actual_data = deserialize_snapshot_data_files_capped(
2621 &snapshot_root_paths,
2622 expected_consumed_size,
2623 |stream| {
2624 Ok(deserialize_from::<_, u32>(
2625 &mut stream.full_snapshot_stream,
2626 )?)
2627 },
2628 )
2629 .unwrap();
2630 assert_eq!(actual_data, expected_data);
2631 }
2632
2633 #[test]
2634 fn test_deserialize_snapshot_data_file_over_limit() {
2635 let expected_data = 2323_u32;
2636 let expected_consumed_size = size_of::<u32>() as u64;
2637
2638 let temp_dir = tempfile::TempDir::new().unwrap();
2639 serialize_snapshot_data_file_capped(
2640 &temp_dir.path().join("data-file"),
2641 expected_consumed_size,
2642 |stream| {
2643 serialize_into(stream, &expected_data)?;
2644 Ok(())
2645 },
2646 )
2647 .unwrap();
2648
2649 let snapshot_root_paths = SnapshotRootPaths {
2650 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2651 incremental_snapshot_root_file_path: None,
2652 };
2653
2654 let result = deserialize_snapshot_data_files_capped(
2655 &snapshot_root_paths,
2656 expected_consumed_size - 1,
2657 |stream| {
2658 Ok(deserialize_from::<_, u32>(
2659 &mut stream.full_snapshot_stream,
2660 )?)
2661 },
2662 );
2663 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
2664 }
2665
2666 #[test]
2667 fn test_deserialize_snapshot_data_file_extra_data() {
2668 let expected_data = 2323_u32;
2669 let expected_consumed_size = size_of::<u32>() as u64;
2670
2671 let temp_dir = tempfile::TempDir::new().unwrap();
2672 serialize_snapshot_data_file_capped(
2673 &temp_dir.path().join("data-file"),
2674 expected_consumed_size * 2,
2675 |stream| {
2676 serialize_into(stream.by_ref(), &expected_data)?;
2677 serialize_into(stream.by_ref(), &expected_data)?;
2678 Ok(())
2679 },
2680 )
2681 .unwrap();
2682
2683 let snapshot_root_paths = SnapshotRootPaths {
2684 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2685 incremental_snapshot_root_file_path: None,
2686 };
2687
2688 let result = deserialize_snapshot_data_files_capped(
2689 &snapshot_root_paths,
2690 expected_consumed_size * 2,
2691 |stream| {
2692 Ok(deserialize_from::<_, u32>(
2693 &mut stream.full_snapshot_stream,
2694 )?)
2695 },
2696 );
2697 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2698 }
2699
2700 #[test]
2701 fn test_snapshot_version_from_file_under_limit() {
2702 let file_content = SnapshotVersion::default().as_str();
2703 let mut file = NamedTempFile::new().unwrap();
2704 file.write_all(file_content.as_bytes()).unwrap();
2705 let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2706 assert_eq!(version_from_file, file_content);
2707 }
2708
2709 #[test]
2710 fn test_snapshot_version_from_file_over_limit() {
2711 let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2712 let file_content = vec![7u8; over_limit_size];
2713 let mut file = NamedTempFile::new().unwrap();
2714 file.write_all(&file_content).unwrap();
2715 assert_matches!(
2716 snapshot_version_from_file(file.path()),
2717 Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("snapshot version file too large")
2718 );
2719 }
2720
2721 #[test]
2722 fn test_parse_full_snapshot_archive_filename() {
2723 assert_eq!(
2724 parse_full_snapshot_archive_filename(&format!(
2725 "snapshot-42-{}.tar.bz2",
2726 Hash::default()
2727 ))
2728 .unwrap(),
2729 (42, SnapshotHash(Hash::default()), ArchiveFormat::TarBzip2)
2730 );
2731 assert_eq!(
2732 parse_full_snapshot_archive_filename(&format!(
2733 "snapshot-43-{}.tar.zst",
2734 Hash::default()
2735 ))
2736 .unwrap(),
2737 (43, SnapshotHash(Hash::default()), ArchiveFormat::TarZstd)
2738 );
2739 assert_eq!(
2740 parse_full_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default()))
2741 .unwrap(),
2742 (44, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2743 );
2744 assert_eq!(
2745 parse_full_snapshot_archive_filename(&format!(
2746 "snapshot-45-{}.tar.lz4",
2747 Hash::default()
2748 ))
2749 .unwrap(),
2750 (45, SnapshotHash(Hash::default()), ArchiveFormat::TarLz4)
2751 );
2752
2753 assert!(parse_full_snapshot_archive_filename("invalid").is_err());
2754 assert!(
2755 parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err()
2756 );
2757
2758 assert!(
2759 parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err()
2760 );
2761 assert!(parse_full_snapshot_archive_filename(&format!(
2762 "snapshot-12345678-{}.bad!ext",
2763 Hash::new_unique()
2764 ))
2765 .is_err());
2766 assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2767
2768 assert!(parse_full_snapshot_archive_filename(&format!(
2769 "snapshot-bad!slot-{}.bad!ext",
2770 Hash::new_unique()
2771 ))
2772 .is_err());
2773 assert!(parse_full_snapshot_archive_filename(&format!(
2774 "snapshot-12345678-{}.bad!ext",
2775 Hash::new_unique()
2776 ))
2777 .is_err());
2778 assert!(parse_full_snapshot_archive_filename(&format!(
2779 "snapshot-bad!slot-{}.tar",
2780 Hash::new_unique()
2781 ))
2782 .is_err());
2783
2784 assert!(parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_err());
2785 assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2786 assert!(parse_full_snapshot_archive_filename(&format!(
2787 "snapshot-bad!slot-{}.tar",
2788 Hash::new_unique()
2789 ))
2790 .is_err());
2791 }
2792
2793 #[test]
2794 fn test_parse_incremental_snapshot_archive_filename() {
2795 assert_eq!(
2796 parse_incremental_snapshot_archive_filename(&format!(
2797 "incremental-snapshot-42-123-{}.tar.bz2",
2798 Hash::default()
2799 ))
2800 .unwrap(),
2801 (
2802 42,
2803 123,
2804 SnapshotHash(Hash::default()),
2805 ArchiveFormat::TarBzip2
2806 )
2807 );
2808 assert_eq!(
2809 parse_incremental_snapshot_archive_filename(&format!(
2810 "incremental-snapshot-43-234-{}.tar.zst",
2811 Hash::default()
2812 ))
2813 .unwrap(),
2814 (
2815 43,
2816 234,
2817 SnapshotHash(Hash::default()),
2818 ArchiveFormat::TarZstd
2819 )
2820 );
2821 assert_eq!(
2822 parse_incremental_snapshot_archive_filename(&format!(
2823 "incremental-snapshot-44-345-{}.tar",
2824 Hash::default()
2825 ))
2826 .unwrap(),
2827 (44, 345, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2828 );
2829 assert_eq!(
2830 parse_incremental_snapshot_archive_filename(&format!(
2831 "incremental-snapshot-45-456-{}.tar.lz4",
2832 Hash::default()
2833 ))
2834 .unwrap(),
2835 (
2836 45,
2837 456,
2838 SnapshotHash(Hash::default()),
2839 ArchiveFormat::TarLz4
2840 )
2841 );
2842
2843 assert!(parse_incremental_snapshot_archive_filename("invalid").is_err());
2844 assert!(parse_incremental_snapshot_archive_filename(&format!(
2845 "snapshot-42-{}.tar",
2846 Hash::new_unique()
2847 ))
2848 .is_err());
2849 assert!(parse_incremental_snapshot_archive_filename(
2850 "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext"
2851 )
2852 .is_err());
2853
2854 assert!(parse_incremental_snapshot_archive_filename(&format!(
2855 "incremental-snapshot-bad!slot-56785678-{}.tar",
2856 Hash::new_unique()
2857 ))
2858 .is_err());
2859
2860 assert!(parse_incremental_snapshot_archive_filename(&format!(
2861 "incremental-snapshot-12345678-bad!slot-{}.tar",
2862 Hash::new_unique()
2863 ))
2864 .is_err());
2865
2866 assert!(parse_incremental_snapshot_archive_filename(
2867 "incremental-snapshot-12341234-56785678-bad!HASH.tar"
2868 )
2869 .is_err());
2870
2871 assert!(parse_incremental_snapshot_archive_filename(&format!(
2872 "incremental-snapshot-12341234-56785678-{}.bad!ext",
2873 Hash::new_unique()
2874 ))
2875 .is_err());
2876 }
2877
2878 #[test]
2879 fn test_check_are_snapshots_compatible() {
2880 let slot1: Slot = 1234;
2881 let slot2: Slot = 5678;
2882 let slot3: Slot = 999_999;
2883
2884 let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
2885 format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()),
2886 ))
2887 .unwrap();
2888
2889 assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
2890
2891 let incremental_snapshot_archive_info =
2892 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2893 "/dir/incremental-snapshot-{}-{}-{}.tar",
2894 slot1,
2895 slot2,
2896 Hash::new_unique()
2897 )))
2898 .unwrap();
2899
2900 assert!(check_are_snapshots_compatible(
2901 &full_snapshot_archive_info,
2902 Some(&incremental_snapshot_archive_info)
2903 )
2904 .is_ok());
2905
2906 let incremental_snapshot_archive_info =
2907 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2908 "/dir/incremental-snapshot-{}-{}-{}.tar",
2909 slot2,
2910 slot3,
2911 Hash::new_unique()
2912 )))
2913 .unwrap();
2914
2915 assert!(check_are_snapshots_compatible(
2916 &full_snapshot_archive_info,
2917 Some(&incremental_snapshot_archive_info)
2918 )
2919 .is_err());
2920 }
2921
2922 fn common_create_bank_snapshot_files(
2924 bank_snapshots_dir: &Path,
2925 min_slot: Slot,
2926 max_slot: Slot,
2927 ) {
2928 for slot in min_slot..max_slot {
2929 let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
2930 fs::create_dir_all(&snapshot_dir).unwrap();
2931
2932 let snapshot_filename = get_snapshot_file_name(slot);
2933 let snapshot_path = snapshot_dir.join(snapshot_filename);
2934 fs::File::create(snapshot_path).unwrap();
2935
2936 let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2937 fs::File::create(status_cache_file).unwrap();
2938
2939 let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
2940 fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
2941
2942 let state_complete_path = snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2944 fs::File::create(state_complete_path).unwrap();
2945 }
2946 }
2947
2948 #[test]
2949 fn test_get_bank_snapshots() {
2950 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2951 let min_slot = 10;
2952 let max_slot = 20;
2953 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2954
2955 let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
2956 assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
2957 }
2958
2959 #[test]
2960 fn test_get_highest_bank_snapshot_post() {
2961 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2962 let min_slot = 99;
2963 let max_slot = 123;
2964 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2965
2966 let highest_bank_snapshot = get_highest_bank_snapshot_post(temp_snapshots_dir.path());
2967 assert!(highest_bank_snapshot.is_some());
2968 assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
2969 }
2970
2971 fn common_create_snapshot_archive_files(
2977 full_snapshot_archives_dir: &Path,
2978 incremental_snapshot_archives_dir: &Path,
2979 min_full_snapshot_slot: Slot,
2980 max_full_snapshot_slot: Slot,
2981 min_incremental_snapshot_slot: Slot,
2982 max_incremental_snapshot_slot: Slot,
2983 ) {
2984 fs::create_dir_all(full_snapshot_archives_dir).unwrap();
2985 fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
2986 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
2987 for incremental_snapshot_slot in
2988 min_incremental_snapshot_slot..max_incremental_snapshot_slot
2989 {
2990 let snapshot_filename = format!(
2991 "incremental-snapshot-{}-{}-{}.tar",
2992 full_snapshot_slot,
2993 incremental_snapshot_slot,
2994 Hash::default()
2995 );
2996 let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
2997 fs::File::create(snapshot_filepath).unwrap();
2998 }
2999
3000 let snapshot_filename =
3001 format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3002 let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
3003 fs::File::create(snapshot_filepath).unwrap();
3004
3005 let bad_filename = format!(
3007 "incremental-snapshot-{}-{}-bad!hash.tar",
3008 full_snapshot_slot,
3009 max_incremental_snapshot_slot + 1,
3010 );
3011 let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
3012 fs::File::create(bad_filepath).unwrap();
3013 }
3014
3015 let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1);
3018 let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
3019 fs::File::create(bad_filepath).unwrap();
3020 }
3021
3022 #[test]
3023 fn test_get_full_snapshot_archives() {
3024 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3025 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3026 let min_slot = 123;
3027 let max_slot = 456;
3028 common_create_snapshot_archive_files(
3029 full_snapshot_archives_dir.path(),
3030 incremental_snapshot_archives_dir.path(),
3031 min_slot,
3032 max_slot,
3033 0,
3034 0,
3035 );
3036
3037 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3038 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3039 }
3040
3041 #[test]
3042 fn test_get_full_snapshot_archives_remote() {
3043 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3044 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3045 let min_slot = 123;
3046 let max_slot = 456;
3047 common_create_snapshot_archive_files(
3048 &full_snapshot_archives_dir
3049 .path()
3050 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3051 &incremental_snapshot_archives_dir
3052 .path()
3053 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3054 min_slot,
3055 max_slot,
3056 0,
3057 0,
3058 );
3059
3060 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3061 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3062 assert!(snapshot_archives.iter().all(|info| info.is_remote()));
3063 }
3064
3065 #[test]
3066 fn test_get_incremental_snapshot_archives() {
3067 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3068 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3069 let min_full_snapshot_slot = 12;
3070 let max_full_snapshot_slot = 23;
3071 let min_incremental_snapshot_slot = 34;
3072 let max_incremental_snapshot_slot = 45;
3073 common_create_snapshot_archive_files(
3074 full_snapshot_archives_dir.path(),
3075 incremental_snapshot_archives_dir.path(),
3076 min_full_snapshot_slot,
3077 max_full_snapshot_slot,
3078 min_incremental_snapshot_slot,
3079 max_incremental_snapshot_slot,
3080 );
3081
3082 let incremental_snapshot_archives =
3083 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3084 assert_eq!(
3085 incremental_snapshot_archives.len() as Slot,
3086 (max_full_snapshot_slot - min_full_snapshot_slot)
3087 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3088 );
3089 }
3090
3091 #[test]
3092 fn test_get_incremental_snapshot_archives_remote() {
3093 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3094 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3095 let min_full_snapshot_slot = 12;
3096 let max_full_snapshot_slot = 23;
3097 let min_incremental_snapshot_slot = 34;
3098 let max_incremental_snapshot_slot = 45;
3099 common_create_snapshot_archive_files(
3100 &full_snapshot_archives_dir
3101 .path()
3102 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3103 &incremental_snapshot_archives_dir
3104 .path()
3105 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3106 min_full_snapshot_slot,
3107 max_full_snapshot_slot,
3108 min_incremental_snapshot_slot,
3109 max_incremental_snapshot_slot,
3110 );
3111
3112 let incremental_snapshot_archives =
3113 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3114 assert_eq!(
3115 incremental_snapshot_archives.len() as Slot,
3116 (max_full_snapshot_slot - min_full_snapshot_slot)
3117 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3118 );
3119 assert!(incremental_snapshot_archives
3120 .iter()
3121 .all(|info| info.is_remote()));
3122 }
3123
3124 #[test]
3125 fn test_get_highest_full_snapshot_archive_slot() {
3126 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3127 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3128 let min_slot = 123;
3129 let max_slot = 456;
3130 common_create_snapshot_archive_files(
3131 full_snapshot_archives_dir.path(),
3132 incremental_snapshot_archives_dir.path(),
3133 min_slot,
3134 max_slot,
3135 0,
3136 0,
3137 );
3138
3139 assert_eq!(
3140 get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
3141 Some(max_slot - 1)
3142 );
3143 }
3144
3145 #[test]
3146 fn test_get_highest_incremental_snapshot_slot() {
3147 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3148 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3149 let min_full_snapshot_slot = 12;
3150 let max_full_snapshot_slot = 23;
3151 let min_incremental_snapshot_slot = 34;
3152 let max_incremental_snapshot_slot = 45;
3153 common_create_snapshot_archive_files(
3154 full_snapshot_archives_dir.path(),
3155 incremental_snapshot_archives_dir.path(),
3156 min_full_snapshot_slot,
3157 max_full_snapshot_slot,
3158 min_incremental_snapshot_slot,
3159 max_incremental_snapshot_slot,
3160 );
3161
3162 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3163 assert_eq!(
3164 get_highest_incremental_snapshot_archive_slot(
3165 incremental_snapshot_archives_dir.path(),
3166 full_snapshot_slot
3167 ),
3168 Some(max_incremental_snapshot_slot - 1)
3169 );
3170 }
3171
3172 assert_eq!(
3173 get_highest_incremental_snapshot_archive_slot(
3174 incremental_snapshot_archives_dir.path(),
3175 max_full_snapshot_slot
3176 ),
3177 None
3178 );
3179 }
3180
3181 fn common_test_purge_old_snapshot_archives(
3182 snapshot_names: &[&String],
3183 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
3184 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
3185 expected_snapshots: &[&String],
3186 ) {
3187 let temp_snap_dir = tempfile::TempDir::new().unwrap();
3188
3189 for snap_name in snapshot_names {
3190 let snap_path = temp_snap_dir.path().join(snap_name);
3191 let mut _snap_file = fs::File::create(snap_path);
3192 }
3193 purge_old_snapshot_archives(
3194 temp_snap_dir.path(),
3195 temp_snap_dir.path(),
3196 maximum_full_snapshot_archives_to_retain,
3197 maximum_incremental_snapshot_archives_to_retain,
3198 );
3199
3200 let mut retained_snaps = HashSet::new();
3201 for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
3202 let entry_path_buf = entry.unwrap().path();
3203 let entry_path = entry_path_buf.as_path();
3204 let snapshot_name = entry_path
3205 .file_name()
3206 .unwrap()
3207 .to_str()
3208 .unwrap()
3209 .to_string();
3210 retained_snaps.insert(snapshot_name);
3211 }
3212
3213 for snap_name in expected_snapshots {
3214 assert!(
3215 retained_snaps.contains(snap_name.as_str()),
3216 "{snap_name} not found"
3217 );
3218 }
3219 assert_eq!(retained_snaps.len(), expected_snapshots.len());
3220 }
3221
3222 #[test]
3223 fn test_purge_old_full_snapshot_archives() {
3224 let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
3225 let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
3226 let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
3227 let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
3228
3229 let expected_snapshots = vec![&snap3_name];
3231 common_test_purge_old_snapshot_archives(
3232 &snapshot_names,
3233 NonZeroUsize::new(1).unwrap(),
3234 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3235 &expected_snapshots,
3236 );
3237
3238 let expected_snapshots = vec![&snap2_name, &snap3_name];
3240 common_test_purge_old_snapshot_archives(
3241 &snapshot_names,
3242 NonZeroUsize::new(2).unwrap(),
3243 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3244 &expected_snapshots,
3245 );
3246
3247 let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
3249 common_test_purge_old_snapshot_archives(
3250 &snapshot_names,
3251 NonZeroUsize::new(3).unwrap(),
3252 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3253 &expected_snapshots,
3254 );
3255 }
3256
3257 #[test]
3261 fn test_purge_old_full_snapshot_archives_in_the_loop() {
3262 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3263 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3264 let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
3265 let starting_slot: Slot = 42;
3266
3267 for slot in (starting_slot..).take(100) {
3268 let full_snapshot_archive_file_name =
3269 format!("snapshot-{}-{}.tar", slot, Hash::default());
3270 let full_snapshot_archive_path = full_snapshot_archives_dir
3271 .as_ref()
3272 .join(full_snapshot_archive_file_name);
3273 fs::File::create(full_snapshot_archive_path).unwrap();
3274
3275 if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
3277 continue;
3278 }
3279
3280 if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
3282 continue;
3283 }
3284
3285 purge_old_snapshot_archives(
3286 &full_snapshot_archives_dir,
3287 &incremental_snapshot_archives_dir,
3288 maximum_snapshots_to_retain,
3289 NonZeroUsize::new(usize::MAX).unwrap(),
3290 );
3291 let mut full_snapshot_archives =
3292 get_full_snapshot_archives(&full_snapshot_archives_dir);
3293 full_snapshot_archives.sort_unstable();
3294 assert_eq!(
3295 full_snapshot_archives.len(),
3296 maximum_snapshots_to_retain.get()
3297 );
3298 assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
3299 for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
3300 assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
3301 }
3302 }
3303 }
3304
3305 #[test]
3306 fn test_purge_old_incremental_snapshot_archives() {
3307 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3308 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3309 let starting_slot = 100_000;
3310
3311 let maximum_incremental_snapshot_archives_to_retain =
3312 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3313 let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3314
3315 let incremental_snapshot_interval = 100;
3316 let num_incremental_snapshots_per_full_snapshot =
3317 maximum_incremental_snapshot_archives_to_retain.get() * 2;
3318 let full_snapshot_interval =
3319 incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
3320
3321 let mut snapshot_filenames = vec![];
3322 (starting_slot..)
3323 .step_by(full_snapshot_interval)
3324 .take(
3325 maximum_full_snapshot_archives_to_retain
3326 .checked_mul(NonZeroUsize::new(2).unwrap())
3327 .unwrap()
3328 .get(),
3329 )
3330 .for_each(|full_snapshot_slot| {
3331 let snapshot_filename =
3332 format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3333 let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
3334 fs::File::create(snapshot_path).unwrap();
3335 snapshot_filenames.push(snapshot_filename);
3336
3337 (full_snapshot_slot..)
3338 .step_by(incremental_snapshot_interval)
3339 .take(num_incremental_snapshots_per_full_snapshot)
3340 .skip(1)
3341 .for_each(|incremental_snapshot_slot| {
3342 let snapshot_filename = format!(
3343 "incremental-snapshot-{}-{}-{}.tar",
3344 full_snapshot_slot,
3345 incremental_snapshot_slot,
3346 Hash::default()
3347 );
3348 let snapshot_path = incremental_snapshot_archives_dir
3349 .path()
3350 .join(&snapshot_filename);
3351 fs::File::create(snapshot_path).unwrap();
3352 snapshot_filenames.push(snapshot_filename);
3353 });
3354 });
3355
3356 purge_old_snapshot_archives(
3357 full_snapshot_archives_dir.path(),
3358 incremental_snapshot_archives_dir.path(),
3359 maximum_full_snapshot_archives_to_retain,
3360 maximum_incremental_snapshot_archives_to_retain,
3361 );
3362
3363 let mut remaining_full_snapshot_archives =
3365 get_full_snapshot_archives(full_snapshot_archives_dir.path());
3366 assert_eq!(
3367 remaining_full_snapshot_archives.len(),
3368 maximum_full_snapshot_archives_to_retain.get(),
3369 );
3370 remaining_full_snapshot_archives.sort_unstable();
3371 let latest_full_snapshot_archive_slot =
3372 remaining_full_snapshot_archives.last().unwrap().slot();
3373
3374 let mut remaining_incremental_snapshot_archives =
3379 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3380 assert_eq!(
3381 remaining_incremental_snapshot_archives.len(),
3382 maximum_incremental_snapshot_archives_to_retain
3383 .get()
3384 .saturating_add(
3385 maximum_full_snapshot_archives_to_retain
3386 .get()
3387 .saturating_sub(1)
3388 )
3389 );
3390 remaining_incremental_snapshot_archives.sort_unstable();
3391 remaining_incremental_snapshot_archives.reverse();
3392
3393 for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
3395 let incremental_snapshot_archive =
3396 remaining_incremental_snapshot_archives.pop().unwrap();
3397
3398 let expected_base_slot =
3399 latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
3400 assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
3401 let expected_slot = expected_base_slot
3402 + (full_snapshot_interval - incremental_snapshot_interval) as u64;
3403 assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
3404 }
3405
3406 for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
3408 assert_eq!(
3409 incremental_snapshot_archive.base_slot(),
3410 latest_full_snapshot_archive_slot
3411 );
3412 }
3413
3414 let expected_remaining_incremental_snapshot_archive_slots =
3416 (latest_full_snapshot_archive_slot..)
3417 .step_by(incremental_snapshot_interval)
3418 .take(num_incremental_snapshots_per_full_snapshot)
3419 .skip(
3420 num_incremental_snapshots_per_full_snapshot
3421 - maximum_incremental_snapshot_archives_to_retain.get(),
3422 )
3423 .collect::<HashSet<_>>();
3424
3425 let actual_remaining_incremental_snapshot_archive_slots =
3426 remaining_incremental_snapshot_archives
3427 .iter()
3428 .map(|snapshot| snapshot.slot())
3429 .collect::<HashSet<_>>();
3430 assert_eq!(
3431 actual_remaining_incremental_snapshot_archive_slots,
3432 expected_remaining_incremental_snapshot_archive_slots
3433 );
3434 }
3435
3436 #[test]
3437 fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
3438 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3439 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3440
3441 for snapshot_filenames in [
3442 format!("incremental-snapshot-100-120-{}.tar", Hash::default()),
3443 format!("incremental-snapshot-100-140-{}.tar", Hash::default()),
3444 format!("incremental-snapshot-100-160-{}.tar", Hash::default()),
3445 format!("incremental-snapshot-100-180-{}.tar", Hash::default()),
3446 format!("incremental-snapshot-200-220-{}.tar", Hash::default()),
3447 format!("incremental-snapshot-200-240-{}.tar", Hash::default()),
3448 format!("incremental-snapshot-200-260-{}.tar", Hash::default()),
3449 format!("incremental-snapshot-200-280-{}.tar", Hash::default()),
3450 ] {
3451 let snapshot_path = incremental_snapshot_archives_dir
3452 .path()
3453 .join(snapshot_filenames);
3454 fs::File::create(snapshot_path).unwrap();
3455 }
3456
3457 purge_old_snapshot_archives(
3458 full_snapshot_archives_dir.path(),
3459 incremental_snapshot_archives_dir.path(),
3460 NonZeroUsize::new(usize::MAX).unwrap(),
3461 NonZeroUsize::new(usize::MAX).unwrap(),
3462 );
3463
3464 let remaining_incremental_snapshot_archives =
3465 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3466 assert!(remaining_incremental_snapshot_archives.is_empty());
3467 }
3468
3469 #[test]
3470 fn test_get_snapshot_accounts_hardlink_dir() {
3471 let slot: Slot = 1;
3472
3473 let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
3474
3475 let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
3476 let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
3477 let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
3478 fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
3479
3480 let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
3481 let appendvec_filename = format!("{slot}.0");
3482 let appendvec_path = accounts_dir.join(appendvec_filename);
3483
3484 let ret = get_snapshot_accounts_hardlink_dir(
3485 &appendvec_path,
3486 slot,
3487 &mut account_paths_set,
3488 &accounts_hardlinks_dir,
3489 );
3490 assert!(ret.is_ok());
3491
3492 let wrong_appendvec_path = appendvec_path
3493 .parent()
3494 .unwrap()
3495 .parent()
3496 .unwrap()
3497 .join(appendvec_path.file_name().unwrap());
3498 let ret = get_snapshot_accounts_hardlink_dir(
3499 &wrong_appendvec_path,
3500 slot,
3501 &mut account_paths_set,
3502 accounts_hardlinks_dir,
3503 );
3504
3505 assert_matches!(
3506 ret,
3507 Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
3508 );
3509 }
3510
3511 #[test]
3512 fn test_full_snapshot_slot_file_good() {
3513 let slot_written = 123_456_789;
3514 let bank_snapshot_dir = TempDir::new().unwrap();
3515 write_full_snapshot_slot_file(&bank_snapshot_dir, slot_written).unwrap();
3516
3517 let slot_read = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap();
3518 assert_eq!(slot_read, slot_written);
3519 }
3520
3521 #[test]
3522 fn test_full_snapshot_slot_file_bad() {
3523 const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
3524 let too_small = [1u8; SLOT_SIZE - 1];
3525 let too_large = [1u8; SLOT_SIZE + 1];
3526
3527 for contents in [too_small.as_slice(), too_large.as_slice()] {
3528 let bank_snapshot_dir = TempDir::new().unwrap();
3529 let full_snapshot_slot_path = bank_snapshot_dir
3530 .as_ref()
3531 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
3532 fs::write(full_snapshot_slot_path, contents).unwrap();
3533
3534 let err = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap_err();
3535 assert!(err
3536 .to_string()
3537 .starts_with("invalid full snapshot slot file size"));
3538 }
3539 }
3540}