1#[cfg(test)]
3use crate::pubkey_bins::PubkeyBinCalculator24;
4use {
5 crate::{accounts_hash::CalculateHashIntermediate, cache_hash_data_stats::CacheHashDataStats},
6 bytemuck_derive::{Pod, Zeroable},
7 memmap2::MmapMut,
8 solana_clock::Slot,
9 solana_measure::{measure::Measure, measure_us},
10 std::{
11 collections::HashSet,
12 fs::{self, remove_file, File, OpenOptions},
13 io::{Seek, SeekFrom, Write},
14 path::{Path, PathBuf},
15 sync::{atomic::Ordering, Arc, Mutex},
16 },
17};
18
19pub type EntryType = CalculateHashIntermediate;
20pub type SavedTypeSlice = [Vec<EntryType>];
21
22#[cfg(test)]
23pub type SavedType = Vec<Vec<EntryType>>;
24
25#[repr(C)]
26#[derive(Debug, Clone, Copy, Pod, Zeroable)]
27pub struct Header {
28 pub count: usize,
29}
30
31const _: () = assert!(
37 std::mem::size_of::<Header>() == std::mem::size_of::<u64>(),
38 "Header cannot have any padding and must be the same size as u64",
39);
40
41pub(crate) struct CacheHashDataFileReference {
43 file: File,
44 file_len: u64,
45 path: PathBuf,
46 stats: Arc<CacheHashDataStats>,
47}
48
49pub(crate) struct CacheHashDataFile {
51 cell_size: u64,
52 mmap: MmapMut,
53 capacity: u64,
54}
55
56impl CacheHashDataFileReference {
57 pub(crate) fn map(&self) -> Result<CacheHashDataFile, std::io::Error> {
59 let file_len = self.file_len;
60 let mut m1 = Measure::start("read_file");
61 let mmap = CacheHashDataFileReference::load_map(&self.file)?;
62 m1.stop();
63 self.stats.read_us.fetch_add(m1.as_us(), Ordering::Relaxed);
64 let header_size = std::mem::size_of::<Header>() as u64;
65 if file_len < header_size {
66 return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof));
67 }
68
69 let cell_size = std::mem::size_of::<EntryType>() as u64;
70 unsafe {
71 assert_eq!(
72 mmap.align_to::<EntryType>().0.len(),
73 0,
74 "mmap is not aligned"
75 );
76 }
77 assert_eq!((cell_size as usize) % std::mem::size_of::<u64>(), 0);
78 let mut cache_file = CacheHashDataFile {
79 mmap,
80 cell_size,
81 capacity: 0,
82 };
83 let header = cache_file.get_header_mut();
84 let entries = header.count;
85
86 let capacity = cell_size * (entries as u64) + header_size;
87 if file_len < capacity {
88 return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof));
89 }
90 cache_file.capacity = capacity;
91 assert_eq!(
92 capacity, file_len,
93 "expected: {capacity}, len on disk: {file_len} {}, entries: {entries}, cell_size: {cell_size}", self.path.display(),
94 );
95
96 self.stats
97 .total_entries
98 .fetch_add(entries, Ordering::Relaxed);
99 self.stats
100 .cache_file_size
101 .fetch_add(capacity as usize, Ordering::Relaxed);
102
103 self.stats.loaded_from_cache.fetch_add(1, Ordering::Relaxed);
104 self.stats
105 .entries_loaded_from_cache
106 .fetch_add(entries, Ordering::Relaxed);
107 Ok(cache_file)
108 }
109
110 fn load_map(file: &File) -> Result<MmapMut, std::io::Error> {
111 Ok(unsafe { MmapMut::map_mut(file).unwrap() })
112 }
113}
114
115impl CacheHashDataFile {
116 pub fn get_cache_hash_data(&self) -> &[EntryType] {
118 self.get_slice(0)
119 }
120
121 #[cfg(test)]
122 pub fn load_all(
124 &self,
125 accumulator: &mut SavedType,
126 start_bin_index: usize,
127 bin_calculator: &PubkeyBinCalculator24,
128 ) {
129 let mut m2 = Measure::start("decode");
130 let slices = self.get_cache_hash_data();
131 for d in slices {
132 let mut pubkey_to_bin_index = bin_calculator.bin_from_pubkey(&d.pubkey);
133 assert!(
134 pubkey_to_bin_index >= start_bin_index,
135 "{pubkey_to_bin_index}, {start_bin_index}"
136 ); pubkey_to_bin_index -= start_bin_index;
138 accumulator[pubkey_to_bin_index].push(*d); }
140
141 m2.stop();
142 }
143
144 fn get_mut(&mut self, ix: u64) -> &mut EntryType {
146 let start = self.get_element_offset_byte(ix);
147 let end = start + std::mem::size_of::<EntryType>();
148 assert!(
149 end <= self.capacity as usize,
150 "end: {end}, capacity: {}, ix: {ix}, cell size: {}",
151 self.capacity,
152 self.cell_size,
153 );
154 let bytes = &mut self.mmap[start..end];
155 bytemuck::from_bytes_mut(bytes)
156 }
157
158 fn get_slice(&self, ix: u64) -> &[EntryType] {
160 let start = self.get_element_offset_byte(ix);
161 let bytes = &self.mmap[start..];
162 debug_assert_eq!(bytes.len() % std::mem::size_of::<EntryType>(), 0);
164 bytemuck::cast_slice(bytes)
165 }
166
167 fn get_element_offset_byte(&self, ix: u64) -> usize {
169 let start = (ix * self.cell_size) as usize + std::mem::size_of::<Header>();
170 debug_assert_eq!(start % std::mem::align_of::<EntryType>(), 0);
171 start
172 }
173
174 fn get_header_mut(&mut self) -> &mut Header {
175 let bytes = &mut self.mmap[..std::mem::size_of::<Header>()];
176 bytemuck::from_bytes_mut(bytes)
177 }
178
179 fn new_map(file: impl AsRef<Path>, capacity: u64) -> Result<MmapMut, std::io::Error> {
180 let mut data = OpenOptions::new()
181 .read(true)
182 .write(true)
183 .create_new(true)
184 .open(file)?;
185
186 data.seek(SeekFrom::Start(capacity - 1)).unwrap();
190 data.write_all(&[0]).unwrap();
191 data.rewind().unwrap();
192 data.flush().unwrap();
193 Ok(unsafe { MmapMut::map_mut(&data).unwrap() })
194 }
195}
196
197pub(crate) struct CacheHashData {
198 cache_dir: PathBuf,
199 pre_existing_cache_files: Arc<Mutex<HashSet<PathBuf>>>,
200 deletion_policy: DeletionPolicy,
201 pub stats: Arc<CacheHashDataStats>,
202}
203
204impl Drop for CacheHashData {
205 fn drop(&mut self) {
206 self.delete_old_cache_files();
207 self.stats.report();
208 }
209}
210
211impl CacheHashData {
212 pub(crate) fn new(cache_dir: PathBuf, deletion_policy: DeletionPolicy) -> CacheHashData {
213 std::fs::create_dir_all(&cache_dir).unwrap_or_else(|err| {
214 panic!("error creating cache dir {}: {err}", cache_dir.display())
215 });
216
217 let result = CacheHashData {
218 cache_dir,
219 pre_existing_cache_files: Arc::new(Mutex::new(HashSet::default())),
220 deletion_policy,
221 stats: Arc::new(CacheHashDataStats::default()),
222 };
223
224 result.get_cache_files();
225 result
226 }
227
228 pub(crate) fn delete_old_cache_files(&self) {
230 let mut old_cache_files =
233 std::mem::take(&mut *self.pre_existing_cache_files.lock().unwrap());
234
235 match self.deletion_policy {
236 DeletionPolicy::AllUnused => {
237 }
239 DeletionPolicy::UnusedAtLeast(storages_start_slot) => {
240 old_cache_files.retain(|old_cache_file| {
243 let Some(parsed_filename) = parse_filename(old_cache_file) else {
244 return true;
246 };
247
248 parsed_filename.slot_range_start >= storages_start_slot
251 });
252 }
253 }
254
255 if !old_cache_files.is_empty() {
256 self.stats
257 .unused_cache_files
258 .fetch_add(old_cache_files.len(), Ordering::Relaxed);
259 for file_name in old_cache_files.iter() {
260 let result = self.cache_dir.join(file_name);
261 let _ = fs::remove_file(result);
262 }
263 }
264 }
265
266 fn get_cache_files(&self) {
267 if self.cache_dir.is_dir() {
268 let dir = fs::read_dir(&self.cache_dir);
269 if let Ok(dir) = dir {
270 let mut pre_existing = self.pre_existing_cache_files.lock().unwrap();
271 for entry in dir.flatten() {
272 if let Some(name) = entry.path().file_name() {
273 pre_existing.insert(PathBuf::from(name));
274 }
275 }
276 self.stats
277 .cache_file_count
278 .fetch_add(pre_existing.len(), Ordering::Relaxed);
279 }
280 }
281 }
282
283 pub(crate) fn get_file_reference_to_map_later(
286 &self,
287 file_name: impl AsRef<Path>,
288 ) -> Result<CacheHashDataFileReference, std::io::Error> {
289 let path = self.cache_dir.join(&file_name);
290 let file_len = std::fs::metadata(&path)?.len();
291 let mut m1 = Measure::start("read_file");
292
293 let file = OpenOptions::new()
294 .read(true)
295 .write(true)
296 .create(false)
297 .open(&path)?;
298 m1.stop();
299 self.stats.read_us.fetch_add(m1.as_us(), Ordering::Relaxed);
300 self.pre_existing_cache_file_will_be_used(file_name);
301
302 Ok(CacheHashDataFileReference {
303 file,
304 file_len,
305 path,
306 stats: Arc::clone(&self.stats),
307 })
308 }
309
310 fn pre_existing_cache_file_will_be_used(&self, file_name: impl AsRef<Path>) {
311 self.pre_existing_cache_files
312 .lock()
313 .unwrap()
314 .remove(file_name.as_ref());
315 }
316
317 pub(crate) fn save(
319 &self,
320 file_name: impl AsRef<Path>,
321 data: &SavedTypeSlice,
322 ) -> Result<(), std::io::Error> {
323 self.save_internal(file_name, data)
324 }
325
326 fn save_internal(
327 &self,
328 file_name: impl AsRef<Path>,
329 data: &SavedTypeSlice,
330 ) -> Result<(), std::io::Error> {
331 let mut m = Measure::start("save");
332 let cache_path = self.cache_dir.join(file_name);
333 let _ignored = remove_file(&cache_path);
335 let cell_size = std::mem::size_of::<EntryType>() as u64;
336 let mut m1 = Measure::start("create save");
337 let entries = data.iter().map(Vec::len).sum::<usize>();
338 let capacity = cell_size * (entries as u64) + std::mem::size_of::<Header>() as u64;
339
340 let mmap = CacheHashDataFile::new_map(&cache_path, capacity)?;
341 m1.stop();
342 self.stats
343 .create_save_us
344 .fetch_add(m1.as_us(), Ordering::Relaxed);
345 let mut cache_file = CacheHashDataFile {
346 mmap,
347 cell_size,
348 capacity,
349 };
350
351 let header = cache_file.get_header_mut();
352 header.count = entries;
353
354 self.stats
355 .cache_file_size
356 .fetch_add(capacity as usize, Ordering::Relaxed);
357 self.stats
358 .total_entries
359 .fetch_add(entries, Ordering::Relaxed);
360
361 let mut m2 = Measure::start("write_to_mmap");
362 let mut i = 0;
363 data.iter().for_each(|x| {
364 x.iter().for_each(|item| {
365 let d = cache_file.get_mut(i as u64);
366 i += 1;
367 *d = *item;
368 })
369 });
370 assert_eq!(i, entries);
371 m2.stop();
372 let (_, measure_flush_us) = measure_us!(cache_file.mmap.flush()?);
376 m.stop();
377 self.stats
378 .write_to_mmap_us
379 .fetch_add(m2.as_us(), Ordering::Relaxed);
380 self.stats
381 .flush_mmap_us
382 .fetch_add(measure_flush_us, Ordering::Relaxed);
383 self.stats.save_us.fetch_add(m.as_us(), Ordering::Relaxed);
384 self.stats.saved_to_cache.fetch_add(1, Ordering::Relaxed);
385 Ok(())
386 }
387}
388
389#[derive(Debug)]
391pub struct ParsedFilename {
392 pub slot_range_start: Slot,
393 pub slot_range_end: Slot,
394 pub bin_range_start: u64,
395 pub bin_range_end: u64,
396 pub hash: u64,
397}
398
399pub fn parse_filename(cache_filename: impl AsRef<Path>) -> Option<ParsedFilename> {
403 let filename = cache_filename.as_ref().to_string_lossy().to_string();
404 let parts: Vec<_> = filename.split('.').collect(); if parts.len() != 5 {
406 return None;
407 }
408 let slot_range_start = parts.first()?.parse().ok()?;
409 let slot_range_end = parts.get(1)?.parse().ok()?;
410 let bin_range_start = parts.get(2)?.parse().ok()?;
411 let bin_range_end = parts.get(3)?.parse().ok()?;
412 let hash = u64::from_str_radix(parts.get(4)?, 16).ok()?; Some(ParsedFilename {
414 slot_range_start,
415 slot_range_end,
416 bin_range_start,
417 bin_range_end,
418 hash,
419 })
420}
421
422#[derive(Debug, Copy, Clone, Eq, PartialEq)]
426pub enum DeletionPolicy {
427 AllUnused,
430 UnusedAtLeast(Slot),
433}
434
435#[cfg(test)]
436mod tests {
437 use {super::*, crate::accounts_hash::AccountHash, rand::Rng};
438
439 impl CacheHashData {
440 fn load(
442 &self,
443 file_name: impl AsRef<Path>,
444 accumulator: &mut SavedType,
445 start_bin_index: usize,
446 bin_calculator: &PubkeyBinCalculator24,
447 ) -> Result<(), std::io::Error> {
448 let mut m = Measure::start("overall");
449 let cache_file = self.load_map(file_name)?;
450 cache_file.load_all(accumulator, start_bin_index, bin_calculator);
451 m.stop();
452 self.stats.load_us.fetch_add(m.as_us(), Ordering::Relaxed);
453 Ok(())
454 }
455
456 fn load_map(
458 &self,
459 file_name: impl AsRef<Path>,
460 ) -> Result<CacheHashDataFile, std::io::Error> {
461 let reference = self.get_file_reference_to_map_later(file_name)?;
462 reference.map()
463 }
464 }
465
466 #[test]
467 fn test_read_write() {
468 use tempfile::TempDir;
473 let tmpdir = TempDir::new().unwrap();
474 let cache_dir = tmpdir.path().to_path_buf();
475 std::fs::create_dir_all(&cache_dir).unwrap();
476
477 for bins in [1, 2, 4] {
478 let bin_calculator = PubkeyBinCalculator24::new(bins);
479 let num_points = 5;
480 let (data, _total_points) = generate_test_data(num_points, bins, &bin_calculator);
481 for passes in [1, 2] {
482 let bins_per_pass = bins / passes;
483 if bins_per_pass == 0 {
484 continue; }
486 for pass in 0..passes {
487 for flatten_data in [true, false] {
488 let mut data_this_pass = if flatten_data {
489 vec![vec![], vec![]]
490 } else {
491 vec![]
492 };
493 let start_bin_this_pass = pass * bins_per_pass;
494 for bin in 0..bins_per_pass {
495 let mut this_bin_data = data[bin + start_bin_this_pass].clone();
496 if flatten_data {
497 data_this_pass[0].append(&mut this_bin_data);
498 } else {
499 data_this_pass.push(this_bin_data);
500 }
501 }
502 let cache =
503 CacheHashData::new(cache_dir.clone(), DeletionPolicy::AllUnused);
504 let file_name = PathBuf::from("test");
505 cache.save(&file_name, &data_this_pass).unwrap();
506 cache.get_cache_files();
507 assert_eq!(
508 cache
509 .pre_existing_cache_files
510 .lock()
511 .unwrap()
512 .iter()
513 .collect::<Vec<_>>(),
514 vec![&file_name],
515 );
516 let mut accum = (0..bins_per_pass).map(|_| vec![]).collect();
517 cache
518 .load(&file_name, &mut accum, start_bin_this_pass, &bin_calculator)
519 .unwrap();
520 if flatten_data {
521 bin_data(
522 &mut data_this_pass,
523 &bin_calculator,
524 bins_per_pass,
525 start_bin_this_pass,
526 );
527 }
528 assert_eq!(
529 accum, data_this_pass,
530 "bins: {bins}, start_bin_this_pass: {start_bin_this_pass}, pass: {pass}, flatten: {flatten_data}, passes: {passes}"
531 );
532 }
533 }
534 }
535 }
536 }
537
538 fn bin_data(
539 data: &mut SavedType,
540 bin_calculator: &PubkeyBinCalculator24,
541 bins: usize,
542 start_bin: usize,
543 ) {
544 let mut accum: SavedType = (0..bins).map(|_| vec![]).collect();
545 data.drain(..).for_each(|mut x| {
546 x.drain(..).for_each(|item| {
547 let bin = bin_calculator.bin_from_pubkey(&item.pubkey);
548 accum[bin - start_bin].push(item);
549 })
550 });
551 *data = accum;
552 }
553
554 fn generate_test_data(
555 count: usize,
556 bins: usize,
557 binner: &PubkeyBinCalculator24,
558 ) -> (SavedType, usize) {
559 let mut rng = rand::thread_rng();
560 let mut ct = 0;
561 (
562 (0..bins)
563 .map(|bin| {
564 let rnd = rng.gen::<u64>() % (bins as u64);
565 if rnd < count as u64 {
566 (0..std::cmp::max(1, count / bins))
567 .map(|_| {
568 ct += 1;
569 let mut pk;
570 loop {
571 pk = solana_pubkey::new_rand();
573 if binner.bin_from_pubkey(&pk) == bin {
574 break;
575 }
576 }
577
578 CalculateHashIntermediate {
579 hash: AccountHash(solana_hash::Hash::new_unique()),
580 lamports: ct as u64,
581 pubkey: pk,
582 }
583 })
584 .collect::<Vec<_>>()
585 } else {
586 vec![]
587 }
588 })
589 .collect::<Vec<_>>(),
590 ct,
591 )
592 }
593
594 #[test]
595 #[allow(clippy::used_underscore_binding)]
596 fn test_parse_filename() {
597 let good_filename = "123.456.0.65536.537d65697d9b2baa";
598 let parsed_filename = parse_filename(good_filename).unwrap();
599 assert_eq!(parsed_filename.slot_range_start, 123);
600 assert_eq!(parsed_filename.slot_range_end, 456);
601 assert_eq!(parsed_filename.bin_range_start, 0);
602 assert_eq!(parsed_filename.bin_range_end, 65536);
603 assert_eq!(parsed_filename.hash, 0x537d65697d9b2baa);
604
605 let bad_filenames = [
606 "123-456-0-65536.537d65697d9b2baa",
608 "abc.456.0.65536.537d65697d9b2baa",
610 "123.xyz.0.65536.537d65697d9b2baa",
611 "123.456.?.65536.537d65697d9b2baa",
612 "123.456.0.@#$%^.537d65697d9b2baa",
613 "123.456.0.65536.base19shouldfail",
614 "123.456.0.65536.123456789012345678901234567890",
615 "123.456.0.65536.",
617 "123.456.0.65536",
618 "123.456.0.65536.537d65697d9b2baa.42",
620 "123.456.0.65536.537d65697d9b2baa.",
621 "123.456.0.65536.537d65697d9b2baa/",
622 ".123.456.0.65536.537d65697d9b2baa",
623 "/123.456.0.65536.537d65697d9b2baa",
624 ];
625 for bad_filename in bad_filenames {
626 assert!(parse_filename(bad_filename).is_none());
627 }
628 }
629}