solana_accounts_db/
utils.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
use {
    lazy_static,
    log::*,
    solana_measure::measure_time,
    std::{
        collections::HashSet,
        fs, io,
        path::{Path, PathBuf},
        sync::Mutex,
        thread,
    },
};

pub const ACCOUNTS_RUN_DIR: &str = "run";
pub const ACCOUNTS_SNAPSHOT_DIR: &str = "snapshot";

/// For all account_paths, create the run/ and snapshot/ sub directories.
/// If an account_path directory does not exist, create it.
/// It returns (account_run_paths, account_snapshot_paths) or error
pub fn create_all_accounts_run_and_snapshot_dirs(
    account_paths: &[PathBuf],
) -> std::io::Result<(Vec<PathBuf>, Vec<PathBuf>)> {
    let mut run_dirs = Vec::with_capacity(account_paths.len());
    let mut snapshot_dirs = Vec::with_capacity(account_paths.len());
    for account_path in account_paths {
        // create the run/ and snapshot/ sub directories for each account_path
        let (run_dir, snapshot_dir) = create_accounts_run_and_snapshot_dirs(account_path)?;
        run_dirs.push(run_dir);
        snapshot_dirs.push(snapshot_dir);
    }
    Ok((run_dirs, snapshot_dirs))
}

/// To allow generating a bank snapshot directory with full state information, we need to
/// hardlink account appendvec files from the runtime operation directory to a snapshot
/// hardlink directory.  This is to create the run/ and snapshot sub directories for an
/// account_path provided by the user.  These two sub directories are on the same file
/// system partition to allow hard-linking.
pub fn create_accounts_run_and_snapshot_dirs(
    account_dir: impl AsRef<Path>,
) -> std::io::Result<(PathBuf, PathBuf)> {
    let run_path = account_dir.as_ref().join(ACCOUNTS_RUN_DIR);
    let snapshot_path = account_dir.as_ref().join(ACCOUNTS_SNAPSHOT_DIR);
    if (!run_path.is_dir()) || (!snapshot_path.is_dir()) {
        // If the "run/" or "snapshot" sub directories do not exist, the directory may be from
        // an older version for which the appendvec files are at this directory.  Clean up
        // them first.
        // This will be done only once when transitioning from an old image without run directory
        // to this new version using run and snapshot directories.
        // The run/ content cleanup will be done at a later point.  The snapshot/ content persists
        // across the process boot, and will be purged by the account_background_service.
        if fs::remove_dir_all(&account_dir).is_err() {
            delete_contents_of_path(&account_dir);
        }
        fs::create_dir_all(&run_path)?;
        fs::create_dir_all(&snapshot_path)?;
    }

    Ok((run_path, snapshot_path))
}

/// Moves and asynchronously deletes the contents of a directory to avoid blocking on it.
/// The directory is re-created after the move, and should now be empty.
pub fn move_and_async_delete_path_contents(path: impl AsRef<Path>) {
    move_and_async_delete_path(&path);
    // The following could fail if the rename failed.
    // If that happens, the directory should be left as is.
    // So we ignore errors here.
    _ = std::fs::create_dir(path);
}

/// Delete directories/files asynchronously to avoid blocking on it.
/// First, in sync context, check if the original path exists, if it
/// does, rename the original path to *_to_be_deleted.
/// If there's an in-progress deleting thread for this path, return.
/// Then spawn a thread to delete the renamed path.
pub fn move_and_async_delete_path(path: impl AsRef<Path>) {
    lazy_static! {
        static ref IN_PROGRESS_DELETES: Mutex<HashSet<PathBuf>> = Mutex::new(HashSet::new());
    };

    // Grab the mutex so no new async delete threads can be spawned for this path.
    let mut lock = IN_PROGRESS_DELETES.lock().unwrap();

    // If the path does not exist, there's nothing to delete.
    if !path.as_ref().exists() {
        return;
    }

    // If the original path (`pathbuf` here) is already being deleted,
    // then the path should not be moved and deleted again.
    if lock.contains(path.as_ref()) {
        return;
    }

    let mut path_delete = path.as_ref().to_path_buf();
    path_delete.set_file_name(format!(
        "{}{}",
        path_delete.file_name().unwrap().to_str().unwrap(),
        "_to_be_deleted"
    ));
    if let Err(err) = fs::rename(&path, &path_delete) {
        warn!(
            "Cannot async delete, retrying in sync mode: failed to rename '{}' to '{}': {err}",
            path.as_ref().display(),
            path_delete.display(),
        );
        // Although the delete here is synchronous, we want to prevent another thread
        // from moving & deleting this directory via `move_and_async_delete_path`.
        lock.insert(path.as_ref().to_path_buf());
        drop(lock); // unlock before doing sync delete

        delete_contents_of_path(&path);
        IN_PROGRESS_DELETES.lock().unwrap().remove(path.as_ref());
        return;
    }

    lock.insert(path_delete.clone());
    drop(lock);
    thread::Builder::new()
        .name("solDeletePath".to_string())
        .spawn(move || {
            trace!("background deleting {}...", path_delete.display());
            let (result, measure_delete) = measure_time!(fs::remove_dir_all(&path_delete));
            if let Err(err) = result {
                panic!("Failed to async delete '{}': {err}", path_delete.display());
            }
            trace!(
                "background deleting {}... Done, and{measure_delete}",
                path_delete.display()
            );

            IN_PROGRESS_DELETES.lock().unwrap().remove(&path_delete);
        })
        .expect("spawn background delete thread");
}

/// Delete the files and subdirectories in a directory.
/// This is useful if the process does not have permission
/// to delete the top level directory it might be able to
/// delete the contents of that directory.
pub fn delete_contents_of_path(path: impl AsRef<Path>) {
    match fs::read_dir(&path) {
        Err(err) => {
            warn!(
                "Failed to delete contents of '{}': could not read dir: {err}",
                path.as_ref().display(),
            )
        }
        Ok(dir_entries) => {
            for entry in dir_entries.flatten() {
                let sub_path = entry.path();
                let result = if sub_path.is_dir() {
                    fs::remove_dir_all(&sub_path)
                } else {
                    fs::remove_file(&sub_path)
                };
                if let Err(err) = result {
                    warn!(
                        "Failed to delete contents of '{}': {err}",
                        sub_path.display(),
                    );
                }
            }
        }
    }
}

/// Creates `directories` if they do not exist, and canonicalizes their paths
pub fn create_and_canonicalize_directories(
    directories: impl IntoIterator<Item = impl AsRef<Path>>,
) -> io::Result<Vec<PathBuf>> {
    directories
        .into_iter()
        .map(create_and_canonicalize_directory)
        .collect()
}

/// Creates `directory` if it does not exist, and canonicalizes its path
pub fn create_and_canonicalize_directory(directory: impl AsRef<Path>) -> io::Result<PathBuf> {
    fs::create_dir_all(&directory)?;
    fs::canonicalize(directory)
}

#[cfg(test)]
mod tests {
    use {super::*, tempfile::TempDir};

    #[test]
    pub fn test_create_all_accounts_run_and_snapshot_dirs() {
        let (_tmp_dirs, account_paths): (Vec<TempDir>, Vec<PathBuf>) = (0..4)
            .map(|_| {
                let tmp_dir = tempfile::TempDir::new().unwrap();
                let account_path = tmp_dir.path().join("accounts");
                (tmp_dir, account_path)
            })
            .unzip();

        // create the `run/` and `snapshot/` dirs, and ensure they're there
        let (account_run_paths, account_snapshot_paths) =
            create_all_accounts_run_and_snapshot_dirs(&account_paths).unwrap();
        account_run_paths.iter().all(|path| path.is_dir());
        account_snapshot_paths.iter().all(|path| path.is_dir());

        // delete a `run/` and `snapshot/` dir, then re-create it
        let account_path_first = account_paths.first().unwrap();
        delete_contents_of_path(account_path_first);
        assert!(account_path_first.exists());
        assert!(!account_path_first.join(ACCOUNTS_RUN_DIR).exists());
        assert!(!account_path_first.join(ACCOUNTS_SNAPSHOT_DIR).exists());

        _ = create_all_accounts_run_and_snapshot_dirs(&account_paths).unwrap();
        account_run_paths.iter().all(|path| path.is_dir());
        account_snapshot_paths.iter().all(|path| path.is_dir());
    }
}