graus_db/
graus_db.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use crate::db_command::{Command, CommandPos};
use crate::io_types::BufReaderWithPos;
use crate::log_storage::log_helpers::{get_log_ids, load_log, log_path, new_log_file};
use crate::log_storage::log_reader::LogReader;
use crate::log_storage::log_writer::LogWriter;
use crate::{GrausError, Result};
use crossbeam_skiplist::SkipMap;
use std::cell::RefCell;
use std::fs::{self, File};
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
use std::{collections::HashMap, path::PathBuf};

/// The `GrausDb` stores string key/value pairs.
///
/// Key/value pairs are persisted to disk in log files. Log files are named after
/// monotonically increasing generation numbers with a `log` extension name.
/// A `SkipMap` in memory stores the keys and the value locations for fast query.
///
/// GrausDb is thead-safe. It can be cloned to use it on new threads.
///
/// ```rust
/// # use graus_db::{GrausDb, Result};
/// # fn try_main() -> Result<()> {
/// use std::env::current_dir;
/// let store = GrausDb::open(current_dir()?)?;
/// store.set("key".to_owned(), "value".to_owned())?;
/// let val = store.get("key".to_owned())?;
/// assert_eq!(val, Some("value".to_owned()));
/// # Ok(())
/// # }
/// ```
#[derive(Clone)]
pub struct GrausDb {
    // Index that maps every Key to a position in a log file.
    index: Arc<SkipMap<String, CommandPos>>,
    // Writes new data into the file system logs. Protected by a mutex.
    writer: Arc<Mutex<LogWriter>>,
    // Reads data from the file system logs.
    reader: LogReader,
}

impl GrausDb {
    /// Opens a `GrausDb` with the given path.
    ///
    /// This will create a new directory if the given one does not exist.
    ///
    /// # Errors
    ///
    /// It propagates I/O or deserialization errors during the log replay.
    pub fn open(path: impl Into<PathBuf>) -> Result<GrausDb> {
        let path = Arc::new(path.into());
        fs::create_dir_all(&*path)?;

        let mut readers = HashMap::new();
        let index = Arc::new(SkipMap::new());

        let log_ids = get_log_ids(&path)?;
        let mut uncompacted = 0;

        for &log_id in &log_ids {
            let log_path = log_path(&path, log_id);
            let mut reader = BufReaderWithPos::new(File::open(&log_path)?)?;
            uncompacted += load_log(log_id, &mut reader, &*index)?;
            readers.insert(log_id, reader);
        }

        let new_log_id = log_ids.last().unwrap_or(&0) + 1;
        let writer = new_log_file(&path, new_log_id)?;
        let safe_point = Arc::new(AtomicU64::new(0));

        let reader = LogReader {
            path: Arc::clone(&path),
            safe_point,
            readers: RefCell::new(readers),
        };

        let writer = LogWriter {
            writer,
            index: Arc::clone(&index),
            reader: reader.clone(),
            current_log_id: new_log_id,
            uncompacted,
            path: Arc::clone(&path),
        };

        Ok(GrausDb {
            reader,
            index,
            writer: Arc::new(Mutex::new(writer)),
        })
    }

    /// Sets the value of a string key to a string.
    ///
    /// If the key already exists, the previous value will be overwritten.
    pub fn set(&self, key: String, value: String) -> Result<()> {
        self.writer.lock().unwrap().set(key, value)
    }

    /// Gets the string value of a given string key.
    ///
    /// Returns `None` if the given key does not exist.
    pub fn get(&self, key: String) -> Result<Option<String>> {
        if let Some(cmd_pos) = self.index.get(&key) {
            if let Command::Set { value, .. } = self.reader.read_command(*cmd_pos.value())? {
                Ok(Some(value))
            } else {
                Err(GrausError::UnexpectedCommandType)
            }
        } else {
            Ok(None)
        }
    }

    /// Removes a given key.
    ///
    /// Returns GrausError::KeyNotFound if the key does not exist.
    pub fn remove(&self, key: String) -> Result<()> {
        self.writer.lock().unwrap().remove(key)
    }

    /// Updates atomically an existing value.
    ///
    /// If predicate_key and predicate are provided, it won´t update the value if the predicate
    /// is not satisfied for predicate_key.
    pub fn update_if<F, P>(
        &self,
        key: String,
        update_fn: F,
        predicate_key: Option<String>,
        predicate: Option<P>,
    ) -> Result<()>
    where
        F: FnOnce(String) -> String,
        P: FnOnce(String) -> bool,
    {
        let mut writer = self.writer.lock().unwrap();
        let current_value = self.get(key.to_owned())?;
        let Some(current_value) = current_value else {
            return Err(GrausError::KeyNotFound);
        };

        if let (Some(predicate_key), Some(predicate)) = (predicate_key, predicate) {
            let current_predicate_key_value = self.get(predicate_key)?;
            let Some(current_predicate_key_value) = current_predicate_key_value else {
                return Err(GrausError::KeyNotFound);
            };
            if !predicate(current_predicate_key_value) {
                return Err(GrausError::PredicateNotSatisfied);
            }
        }

        let updated_value = update_fn(current_value);
        writer.set(key, updated_value)
    }
}