1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use crate::{
deserialize_from, parse_append_vec_name, AccountsDbFields, AppendVec, AppendVecIterator,
DeserializableVersionedBank, ReadProgressTracking, Result, SerializableAccountStorageEntry,
SnapshotError, SnapshotExtractor, SNAPSHOTS_DIR,
};
use itertools::Itertools;
use log::info;
use solana_runtime::snapshot_utils::SNAPSHOT_STATUS_CACHE_FILENAME;
use std::fs::OpenOptions;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Instant;
pub struct UnpackedSnapshotExtractor {
root: PathBuf,
accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
}
impl SnapshotExtractor for UnpackedSnapshotExtractor {
fn iter(&mut self) -> AppendVecIterator<'_> {
Box::new(self.unboxed_iter())
}
}
impl UnpackedSnapshotExtractor {
pub fn open(path: &Path, progress_tracking: Box<dyn ReadProgressTracking>) -> Result<Self> {
let snapshots_dir = path.join(SNAPSHOTS_DIR);
let status_cache = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
if !status_cache.is_file() {
return Err(SnapshotError::NoStatusCache);
}
let snapshot_files = snapshots_dir.read_dir()?;
let snapshot_file_path = snapshot_files
.filter_map(|entry| entry.ok())
.find(|entry| u64::from_str(&entry.file_name().to_string_lossy()).is_ok())
.map(|entry| entry.path().join(entry.file_name()))
.ok_or(SnapshotError::NoSnapshotManifest)?;
info!("Opening snapshot manifest: {:?}", snapshot_file_path);
let snapshot_file = OpenOptions::new().read(true).open(&snapshot_file_path)?;
let snapshot_file_len = snapshot_file.metadata()?.len();
let snapshot_file = progress_tracking.new_read_progress_tracker(
&snapshot_file_path,
Box::new(snapshot_file),
snapshot_file_len,
);
let mut snapshot_file = BufReader::new(snapshot_file);
let pre_unpack = Instant::now();
let versioned_bank: DeserializableVersionedBank = deserialize_from(&mut snapshot_file)?;
drop(versioned_bank);
let versioned_bank_post_time = Instant::now();
let accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry> =
deserialize_from(&mut snapshot_file)?;
let accounts_db_fields_post_time = Instant::now();
drop(snapshot_file);
info!(
"Read bank fields in {:?}",
versioned_bank_post_time - pre_unpack
);
info!(
"Read accounts DB fields in {:?}",
accounts_db_fields_post_time - versioned_bank_post_time
);
Ok(UnpackedSnapshotExtractor {
root: path.to_path_buf(),
accounts_db_fields,
})
}
pub fn unboxed_iter(&self) -> impl Iterator<Item = Result<AppendVec>> + '_ {
std::iter::once(self.iter_streams())
.flatten_ok()
.flatten_ok()
}
fn iter_streams(&self) -> Result<impl Iterator<Item = Result<AppendVec>> + '_> {
let accounts_dir = self.root.join("accounts");
Ok(accounts_dir
.read_dir()?
.filter_map(|f| f.ok())
.filter_map(|f| {
let name = f.file_name();
parse_append_vec_name(&f.file_name()).map(move |parsed| (parsed, name))
})
.map(move |((slot, version), name)| {
self.open_append_vec(slot, version, &accounts_dir.join(name))
}))
}
fn open_append_vec(&self, slot: u64, id: u64, path: &Path) -> Result<AppendVec> {
let known_vecs = self
.accounts_db_fields
.0
.get(&slot)
.map(|v| &v[..])
.unwrap_or(&[]);
let known_vec = known_vecs.iter().find(|entry| entry.id == (id as usize));
let known_vec = match known_vec {
None => return Err(SnapshotError::UnexpectedAppendVec),
Some(v) => v,
};
Ok(AppendVec::new_from_file(
path,
known_vec.accounts_current_len,
)?)
}
}