solana_accounts_db/
cache_hash_data.rs

1//! Cached data for hashing accounts
2#[cfg(test)]
3use crate::pubkey_bins::PubkeyBinCalculator24;
4use {
5    crate::{accounts_hash::CalculateHashIntermediate, cache_hash_data_stats::CacheHashDataStats},
6    bytemuck_derive::{Pod, Zeroable},
7    memmap2::MmapMut,
8    solana_clock::Slot,
9    solana_measure::{measure::Measure, measure_us},
10    std::{
11        collections::HashSet,
12        fs::{self, remove_file, File, OpenOptions},
13        io::{Seek, SeekFrom, Write},
14        path::{Path, PathBuf},
15        sync::{atomic::Ordering, Arc, Mutex},
16    },
17};
18
19pub type EntryType = CalculateHashIntermediate;
20pub type SavedTypeSlice = [Vec<EntryType>];
21
22#[cfg(test)]
23pub type SavedType = Vec<Vec<EntryType>>;
24
25#[repr(C)]
26#[derive(Debug, Clone, Copy, Pod, Zeroable)]
27pub struct Header {
28    pub count: usize,
29}
30
31// In order to safely guarantee Header is Pod, it cannot have any padding
32// This is obvious by inspection, but this will also catch any inadvertent
33// changes in the future (i.e. it is a test).
34// Additionally, we compare the header size with `u64` instead of `usize`
35// to ensure binary compatibility doesn't break.
36const _: () = assert!(
37    std::mem::size_of::<Header>() == std::mem::size_of::<u64>(),
38    "Header cannot have any padding and must be the same size as u64",
39);
40
41/// cache hash data file to be mmapped later
42pub(crate) struct CacheHashDataFileReference {
43    file: File,
44    file_len: u64,
45    path: PathBuf,
46    stats: Arc<CacheHashDataStats>,
47}
48
49/// mmapped cache hash data file
50pub(crate) struct CacheHashDataFile {
51    cell_size: u64,
52    mmap: MmapMut,
53    capacity: u64,
54}
55
56impl CacheHashDataFileReference {
57    /// convert the open file reference to a mmapped file that can be returned as a slice
58    pub(crate) fn map(&self) -> Result<CacheHashDataFile, std::io::Error> {
59        let file_len = self.file_len;
60        let mut m1 = Measure::start("read_file");
61        let mmap = CacheHashDataFileReference::load_map(&self.file)?;
62        m1.stop();
63        self.stats.read_us.fetch_add(m1.as_us(), Ordering::Relaxed);
64        let header_size = std::mem::size_of::<Header>() as u64;
65        if file_len < header_size {
66            return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof));
67        }
68
69        let cell_size = std::mem::size_of::<EntryType>() as u64;
70        unsafe {
71            assert_eq!(
72                mmap.align_to::<EntryType>().0.len(),
73                0,
74                "mmap is not aligned"
75            );
76        }
77        assert_eq!((cell_size as usize) % std::mem::size_of::<u64>(), 0);
78        let mut cache_file = CacheHashDataFile {
79            mmap,
80            cell_size,
81            capacity: 0,
82        };
83        let header = cache_file.get_header_mut();
84        let entries = header.count;
85
86        let capacity = cell_size * (entries as u64) + header_size;
87        if file_len < capacity {
88            return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof));
89        }
90        cache_file.capacity = capacity;
91        assert_eq!(
92            capacity, file_len,
93            "expected: {capacity}, len on disk: {file_len} {}, entries: {entries}, cell_size: {cell_size}", self.path.display(),
94        );
95
96        self.stats
97            .total_entries
98            .fetch_add(entries, Ordering::Relaxed);
99        self.stats
100            .cache_file_size
101            .fetch_add(capacity as usize, Ordering::Relaxed);
102
103        self.stats.loaded_from_cache.fetch_add(1, Ordering::Relaxed);
104        self.stats
105            .entries_loaded_from_cache
106            .fetch_add(entries, Ordering::Relaxed);
107        Ok(cache_file)
108    }
109
110    fn load_map(file: &File) -> Result<MmapMut, std::io::Error> {
111        Ok(unsafe { MmapMut::map_mut(file).unwrap() })
112    }
113}
114
115impl CacheHashDataFile {
116    /// return a slice of a reference to all the cache hash data from the mmapped file
117    pub fn get_cache_hash_data(&self) -> &[EntryType] {
118        self.get_slice(0)
119    }
120
121    #[cfg(test)]
122    /// Populate 'accumulator' from entire contents of the cache file.
123    pub fn load_all(
124        &self,
125        accumulator: &mut SavedType,
126        start_bin_index: usize,
127        bin_calculator: &PubkeyBinCalculator24,
128    ) {
129        let mut m2 = Measure::start("decode");
130        let slices = self.get_cache_hash_data();
131        for d in slices {
132            let mut pubkey_to_bin_index = bin_calculator.bin_from_pubkey(&d.pubkey);
133            assert!(
134                pubkey_to_bin_index >= start_bin_index,
135                "{pubkey_to_bin_index}, {start_bin_index}"
136            ); // this would indicate we put a pubkey in too high of a bin
137            pubkey_to_bin_index -= start_bin_index;
138            accumulator[pubkey_to_bin_index].push(*d); // may want to avoid copy here
139        }
140
141        m2.stop();
142    }
143
144    /// get '&mut EntryType' from cache file [ix]
145    fn get_mut(&mut self, ix: u64) -> &mut EntryType {
146        let start = self.get_element_offset_byte(ix);
147        let end = start + std::mem::size_of::<EntryType>();
148        assert!(
149            end <= self.capacity as usize,
150            "end: {end}, capacity: {}, ix: {ix}, cell size: {}",
151            self.capacity,
152            self.cell_size,
153        );
154        let bytes = &mut self.mmap[start..end];
155        bytemuck::from_bytes_mut(bytes)
156    }
157
158    /// get '&[EntryType]' from cache file [ix..]
159    fn get_slice(&self, ix: u64) -> &[EntryType] {
160        let start = self.get_element_offset_byte(ix);
161        let bytes = &self.mmap[start..];
162        // the `bytes` slice *must* contain whole `EntryType`s
163        debug_assert_eq!(bytes.len() % std::mem::size_of::<EntryType>(), 0);
164        bytemuck::cast_slice(bytes)
165    }
166
167    /// return byte offset of entry 'ix' into a slice which contains a header and at least ix elements
168    fn get_element_offset_byte(&self, ix: u64) -> usize {
169        let start = (ix * self.cell_size) as usize + std::mem::size_of::<Header>();
170        debug_assert_eq!(start % std::mem::align_of::<EntryType>(), 0);
171        start
172    }
173
174    fn get_header_mut(&mut self) -> &mut Header {
175        let bytes = &mut self.mmap[..std::mem::size_of::<Header>()];
176        bytemuck::from_bytes_mut(bytes)
177    }
178
179    fn new_map(file: impl AsRef<Path>, capacity: u64) -> Result<MmapMut, std::io::Error> {
180        let mut data = OpenOptions::new()
181            .read(true)
182            .write(true)
183            .create_new(true)
184            .open(file)?;
185
186        // Theoretical performance optimization: write a zero to the end of
187        // the file so that we won't have to resize it later, which may be
188        // expensive.
189        data.seek(SeekFrom::Start(capacity - 1)).unwrap();
190        data.write_all(&[0]).unwrap();
191        data.rewind().unwrap();
192        data.flush().unwrap();
193        Ok(unsafe { MmapMut::map_mut(&data).unwrap() })
194    }
195}
196
197pub(crate) struct CacheHashData {
198    cache_dir: PathBuf,
199    pre_existing_cache_files: Arc<Mutex<HashSet<PathBuf>>>,
200    deletion_policy: DeletionPolicy,
201    pub stats: Arc<CacheHashDataStats>,
202}
203
204impl Drop for CacheHashData {
205    fn drop(&mut self) {
206        self.delete_old_cache_files();
207        self.stats.report();
208    }
209}
210
211impl CacheHashData {
212    pub(crate) fn new(cache_dir: PathBuf, deletion_policy: DeletionPolicy) -> CacheHashData {
213        std::fs::create_dir_all(&cache_dir).unwrap_or_else(|err| {
214            panic!("error creating cache dir {}: {err}", cache_dir.display())
215        });
216
217        let result = CacheHashData {
218            cache_dir,
219            pre_existing_cache_files: Arc::new(Mutex::new(HashSet::default())),
220            deletion_policy,
221            stats: Arc::new(CacheHashDataStats::default()),
222        };
223
224        result.get_cache_files();
225        result
226    }
227
228    /// delete all pre-existing files that will not be used
229    pub(crate) fn delete_old_cache_files(&self) {
230        // all the renaming files in `pre_existing_cache_files` were *not* used for this
231        // accounts hash calculation
232        let mut old_cache_files =
233            std::mem::take(&mut *self.pre_existing_cache_files.lock().unwrap());
234
235        match self.deletion_policy {
236            DeletionPolicy::AllUnused => {
237                // no additional work to do here; we will delete everything in `old_cache_files`
238            }
239            DeletionPolicy::UnusedAtLeast(storages_start_slot) => {
240                // when calculating an incremental accounts hash, we only want to delete the unused
241                // cache files *that IAH considered*
242                old_cache_files.retain(|old_cache_file| {
243                    let Some(parsed_filename) = parse_filename(old_cache_file) else {
244                        // if parsing the cache filename fails, we *do* want to delete it
245                        return true;
246                    };
247
248                    // if the old cache file is in the incremental accounts hash calculation range,
249                    // then delete it
250                    parsed_filename.slot_range_start >= storages_start_slot
251                });
252            }
253        }
254
255        if !old_cache_files.is_empty() {
256            self.stats
257                .unused_cache_files
258                .fetch_add(old_cache_files.len(), Ordering::Relaxed);
259            for file_name in old_cache_files.iter() {
260                let result = self.cache_dir.join(file_name);
261                let _ = fs::remove_file(result);
262            }
263        }
264    }
265
266    fn get_cache_files(&self) {
267        if self.cache_dir.is_dir() {
268            let dir = fs::read_dir(&self.cache_dir);
269            if let Ok(dir) = dir {
270                let mut pre_existing = self.pre_existing_cache_files.lock().unwrap();
271                for entry in dir.flatten() {
272                    if let Some(name) = entry.path().file_name() {
273                        pre_existing.insert(PathBuf::from(name));
274                    }
275                }
276                self.stats
277                    .cache_file_count
278                    .fetch_add(pre_existing.len(), Ordering::Relaxed);
279            }
280        }
281    }
282
283    /// open a cache hash file, but don't map it.
284    /// This allows callers to know a file exists, but preserves the # mmapped files.
285    pub(crate) fn get_file_reference_to_map_later(
286        &self,
287        file_name: impl AsRef<Path>,
288    ) -> Result<CacheHashDataFileReference, std::io::Error> {
289        let path = self.cache_dir.join(&file_name);
290        let file_len = std::fs::metadata(&path)?.len();
291        let mut m1 = Measure::start("read_file");
292
293        let file = OpenOptions::new()
294            .read(true)
295            .write(true)
296            .create(false)
297            .open(&path)?;
298        m1.stop();
299        self.stats.read_us.fetch_add(m1.as_us(), Ordering::Relaxed);
300        self.pre_existing_cache_file_will_be_used(file_name);
301
302        Ok(CacheHashDataFileReference {
303            file,
304            file_len,
305            path,
306            stats: Arc::clone(&self.stats),
307        })
308    }
309
310    fn pre_existing_cache_file_will_be_used(&self, file_name: impl AsRef<Path>) {
311        self.pre_existing_cache_files
312            .lock()
313            .unwrap()
314            .remove(file_name.as_ref());
315    }
316
317    /// save 'data' to 'file_name'
318    pub(crate) fn save(
319        &self,
320        file_name: impl AsRef<Path>,
321        data: &SavedTypeSlice,
322    ) -> Result<(), std::io::Error> {
323        self.save_internal(file_name, data)
324    }
325
326    fn save_internal(
327        &self,
328        file_name: impl AsRef<Path>,
329        data: &SavedTypeSlice,
330    ) -> Result<(), std::io::Error> {
331        let mut m = Measure::start("save");
332        let cache_path = self.cache_dir.join(file_name);
333        // overwrite any existing file at this path
334        let _ignored = remove_file(&cache_path);
335        let cell_size = std::mem::size_of::<EntryType>() as u64;
336        let mut m1 = Measure::start("create save");
337        let entries = data.iter().map(Vec::len).sum::<usize>();
338        let capacity = cell_size * (entries as u64) + std::mem::size_of::<Header>() as u64;
339
340        let mmap = CacheHashDataFile::new_map(&cache_path, capacity)?;
341        m1.stop();
342        self.stats
343            .create_save_us
344            .fetch_add(m1.as_us(), Ordering::Relaxed);
345        let mut cache_file = CacheHashDataFile {
346            mmap,
347            cell_size,
348            capacity,
349        };
350
351        let header = cache_file.get_header_mut();
352        header.count = entries;
353
354        self.stats
355            .cache_file_size
356            .fetch_add(capacity as usize, Ordering::Relaxed);
357        self.stats
358            .total_entries
359            .fetch_add(entries, Ordering::Relaxed);
360
361        let mut m2 = Measure::start("write_to_mmap");
362        let mut i = 0;
363        data.iter().for_each(|x| {
364            x.iter().for_each(|item| {
365                let d = cache_file.get_mut(i as u64);
366                i += 1;
367                *d = *item;
368            })
369        });
370        assert_eq!(i, entries);
371        m2.stop();
372        // We must flush the mmap after writing, since we're about to turn around and load it for
373        // reading *not* via the mmap.  If the mmap is never flushed to disk, it is possible the
374        // entries will *not* be visible when the reader comes along.
375        let (_, measure_flush_us) = measure_us!(cache_file.mmap.flush()?);
376        m.stop();
377        self.stats
378            .write_to_mmap_us
379            .fetch_add(m2.as_us(), Ordering::Relaxed);
380        self.stats
381            .flush_mmap_us
382            .fetch_add(measure_flush_us, Ordering::Relaxed);
383        self.stats.save_us.fetch_add(m.as_us(), Ordering::Relaxed);
384        self.stats.saved_to_cache.fetch_add(1, Ordering::Relaxed);
385        Ok(())
386    }
387}
388
389/// The values of each part of a cache hash data filename
390#[derive(Debug)]
391pub struct ParsedFilename {
392    pub slot_range_start: Slot,
393    pub slot_range_end: Slot,
394    pub bin_range_start: u64,
395    pub bin_range_end: u64,
396    pub hash: u64,
397}
398
399/// Parses a cache hash data filename into its parts
400///
401/// Returns None if the filename is invalid
402pub fn parse_filename(cache_filename: impl AsRef<Path>) -> Option<ParsedFilename> {
403    let filename = cache_filename.as_ref().to_string_lossy().to_string();
404    let parts: Vec<_> = filename.split('.').collect(); // The parts are separated by a `.`
405    if parts.len() != 5 {
406        return None;
407    }
408    let slot_range_start = parts.first()?.parse().ok()?;
409    let slot_range_end = parts.get(1)?.parse().ok()?;
410    let bin_range_start = parts.get(2)?.parse().ok()?;
411    let bin_range_end = parts.get(3)?.parse().ok()?;
412    let hash = u64::from_str_radix(parts.get(4)?, 16).ok()?; // the hash is in hex
413    Some(ParsedFilename {
414        slot_range_start,
415        slot_range_end,
416        bin_range_start,
417        bin_range_end,
418        hash,
419    })
420}
421
422/// Decides which old cache files to delete
423///
424/// See `delete_old_cache_files()` for more info.
425#[derive(Debug, Copy, Clone, Eq, PartialEq)]
426pub enum DeletionPolicy {
427    /// Delete *all* the unused cache files
428    /// Should be used when calculating full accounts hash
429    AllUnused,
430    /// Delete *only* the unused cache files with starting slot range *at least* this slot
431    /// Should be used when calculating incremental accounts hash
432    UnusedAtLeast(Slot),
433}
434
435#[cfg(test)]
436mod tests {
437    use {super::*, crate::accounts_hash::AccountHash, rand::Rng};
438
439    impl CacheHashData {
440        /// load from 'file_name' into 'accumulator'
441        fn load(
442            &self,
443            file_name: impl AsRef<Path>,
444            accumulator: &mut SavedType,
445            start_bin_index: usize,
446            bin_calculator: &PubkeyBinCalculator24,
447        ) -> Result<(), std::io::Error> {
448            let mut m = Measure::start("overall");
449            let cache_file = self.load_map(file_name)?;
450            cache_file.load_all(accumulator, start_bin_index, bin_calculator);
451            m.stop();
452            self.stats.load_us.fetch_add(m.as_us(), Ordering::Relaxed);
453            Ok(())
454        }
455
456        /// map 'file_name' into memory
457        fn load_map(
458            &self,
459            file_name: impl AsRef<Path>,
460        ) -> Result<CacheHashDataFile, std::io::Error> {
461            let reference = self.get_file_reference_to_map_later(file_name)?;
462            reference.map()
463        }
464    }
465
466    #[test]
467    fn test_read_write() {
468        // generate sample data
469        // write to file
470        // read
471        // compare
472        use tempfile::TempDir;
473        let tmpdir = TempDir::new().unwrap();
474        let cache_dir = tmpdir.path().to_path_buf();
475        std::fs::create_dir_all(&cache_dir).unwrap();
476
477        for bins in [1, 2, 4] {
478            let bin_calculator = PubkeyBinCalculator24::new(bins);
479            let num_points = 5;
480            let (data, _total_points) = generate_test_data(num_points, bins, &bin_calculator);
481            for passes in [1, 2] {
482                let bins_per_pass = bins / passes;
483                if bins_per_pass == 0 {
484                    continue; // illegal test case
485                }
486                for pass in 0..passes {
487                    for flatten_data in [true, false] {
488                        let mut data_this_pass = if flatten_data {
489                            vec![vec![], vec![]]
490                        } else {
491                            vec![]
492                        };
493                        let start_bin_this_pass = pass * bins_per_pass;
494                        for bin in 0..bins_per_pass {
495                            let mut this_bin_data = data[bin + start_bin_this_pass].clone();
496                            if flatten_data {
497                                data_this_pass[0].append(&mut this_bin_data);
498                            } else {
499                                data_this_pass.push(this_bin_data);
500                            }
501                        }
502                        let cache =
503                            CacheHashData::new(cache_dir.clone(), DeletionPolicy::AllUnused);
504                        let file_name = PathBuf::from("test");
505                        cache.save(&file_name, &data_this_pass).unwrap();
506                        cache.get_cache_files();
507                        assert_eq!(
508                            cache
509                                .pre_existing_cache_files
510                                .lock()
511                                .unwrap()
512                                .iter()
513                                .collect::<Vec<_>>(),
514                            vec![&file_name],
515                        );
516                        let mut accum = (0..bins_per_pass).map(|_| vec![]).collect();
517                        cache
518                            .load(&file_name, &mut accum, start_bin_this_pass, &bin_calculator)
519                            .unwrap();
520                        if flatten_data {
521                            bin_data(
522                                &mut data_this_pass,
523                                &bin_calculator,
524                                bins_per_pass,
525                                start_bin_this_pass,
526                            );
527                        }
528                        assert_eq!(
529                            accum, data_this_pass,
530                            "bins: {bins}, start_bin_this_pass: {start_bin_this_pass}, pass: {pass}, flatten: {flatten_data}, passes: {passes}"
531                        );
532                    }
533                }
534            }
535        }
536    }
537
538    fn bin_data(
539        data: &mut SavedType,
540        bin_calculator: &PubkeyBinCalculator24,
541        bins: usize,
542        start_bin: usize,
543    ) {
544        let mut accum: SavedType = (0..bins).map(|_| vec![]).collect();
545        data.drain(..).for_each(|mut x| {
546            x.drain(..).for_each(|item| {
547                let bin = bin_calculator.bin_from_pubkey(&item.pubkey);
548                accum[bin - start_bin].push(item);
549            })
550        });
551        *data = accum;
552    }
553
554    fn generate_test_data(
555        count: usize,
556        bins: usize,
557        binner: &PubkeyBinCalculator24,
558    ) -> (SavedType, usize) {
559        let mut rng = rand::thread_rng();
560        let mut ct = 0;
561        (
562            (0..bins)
563                .map(|bin| {
564                    let rnd = rng.gen::<u64>() % (bins as u64);
565                    if rnd < count as u64 {
566                        (0..std::cmp::max(1, count / bins))
567                            .map(|_| {
568                                ct += 1;
569                                let mut pk;
570                                loop {
571                                    // expensive, but small numbers and for tests, so ok
572                                    pk = solana_pubkey::new_rand();
573                                    if binner.bin_from_pubkey(&pk) == bin {
574                                        break;
575                                    }
576                                }
577
578                                CalculateHashIntermediate {
579                                    hash: AccountHash(solana_hash::Hash::new_unique()),
580                                    lamports: ct as u64,
581                                    pubkey: pk,
582                                }
583                            })
584                            .collect::<Vec<_>>()
585                    } else {
586                        vec![]
587                    }
588                })
589                .collect::<Vec<_>>(),
590            ct,
591        )
592    }
593
594    #[test]
595    #[allow(clippy::used_underscore_binding)]
596    fn test_parse_filename() {
597        let good_filename = "123.456.0.65536.537d65697d9b2baa";
598        let parsed_filename = parse_filename(good_filename).unwrap();
599        assert_eq!(parsed_filename.slot_range_start, 123);
600        assert_eq!(parsed_filename.slot_range_end, 456);
601        assert_eq!(parsed_filename.bin_range_start, 0);
602        assert_eq!(parsed_filename.bin_range_end, 65536);
603        assert_eq!(parsed_filename.hash, 0x537d65697d9b2baa);
604
605        let bad_filenames = [
606            // bad separator
607            "123-456-0-65536.537d65697d9b2baa",
608            // bad values
609            "abc.456.0.65536.537d65697d9b2baa",
610            "123.xyz.0.65536.537d65697d9b2baa",
611            "123.456.?.65536.537d65697d9b2baa",
612            "123.456.0.@#$%^.537d65697d9b2baa",
613            "123.456.0.65536.base19shouldfail",
614            "123.456.0.65536.123456789012345678901234567890",
615            // missing values
616            "123.456.0.65536.",
617            "123.456.0.65536",
618            // extra junk
619            "123.456.0.65536.537d65697d9b2baa.42",
620            "123.456.0.65536.537d65697d9b2baa.",
621            "123.456.0.65536.537d65697d9b2baa/",
622            ".123.456.0.65536.537d65697d9b2baa",
623            "/123.456.0.65536.537d65697d9b2baa",
624        ];
625        for bad_filename in bad_filenames {
626            assert!(parse_filename(bad_filename).is_none());
627        }
628    }
629}