graus_db/graus_db.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
use crate::db_command::{Command, CommandPos};
use crate::io_types::BufReaderWithPos;
use crate::log_storage::log_helpers::{get_log_ids, load_log, log_path, new_log_file};
use crate::log_storage::log_reader::LogReader;
use crate::log_storage::log_writer::LogWriter;
use crate::{GrausError, Result};
use crossbeam_skiplist::SkipMap;
use std::cell::RefCell;
use std::fs::{self, File};
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
use std::{collections::HashMap, path::PathBuf};
/// The `GrausDb` stores string key/value pairs.
///
/// Key/value pairs are persisted to disk in log files. Log files are named after
/// monotonically increasing generation numbers with a `log` extension name.
/// A `SkipMap` in memory stores the keys and the value locations for fast query.
///
/// GrausDb is thead-safe. It can be cloned to use it on new threads.
///
/// ```rust
/// # use graus_db::{GrausDb, Result};
/// # fn try_main() -> Result<()> {
/// use std::env::current_dir;
/// let store = GrausDb::open(current_dir()?)?;
/// store.set("key".to_owned(), "value".to_owned())?;
/// let val = store.get("key".to_owned())?;
/// assert_eq!(val, Some("value".to_owned()));
/// # Ok(())
/// # }
/// ```
#[derive(Clone)]
pub struct GrausDb {
// Index that maps every Key to a position in a log file.
index: Arc<SkipMap<String, CommandPos>>,
// Writes new data into the file system logs. Protected by a mutex.
writer: Arc<Mutex<LogWriter>>,
// Reads data from the file system logs.
reader: LogReader,
}
impl GrausDb {
/// Opens a `GrausDb` with the given path.
///
/// This will create a new directory if the given one does not exist.
///
/// # Errors
///
/// It propagates I/O or deserialization errors during the log replay.
pub fn open(path: impl Into<PathBuf>) -> Result<GrausDb> {
let path = Arc::new(path.into());
fs::create_dir_all(&*path)?;
let mut readers = HashMap::new();
let index = Arc::new(SkipMap::new());
let log_ids = get_log_ids(&path)?;
let mut uncompacted = 0;
for &log_id in &log_ids {
let log_path = log_path(&path, log_id);
let mut reader = BufReaderWithPos::new(File::open(&log_path)?)?;
uncompacted += load_log(log_id, &mut reader, &*index)?;
readers.insert(log_id, reader);
}
let new_log_id = log_ids.last().unwrap_or(&0) + 1;
let writer = new_log_file(&path, new_log_id)?;
let safe_point = Arc::new(AtomicU64::new(0));
let reader = LogReader {
path: Arc::clone(&path),
safe_point,
readers: RefCell::new(readers),
};
let writer = LogWriter {
writer,
index: Arc::clone(&index),
reader: reader.clone(),
current_log_id: new_log_id,
uncompacted,
path: Arc::clone(&path),
};
Ok(GrausDb {
reader,
index,
writer: Arc::new(Mutex::new(writer)),
})
}
/// Sets the value of a string key to a string.
///
/// If the key already exists, the previous value will be overwritten.
pub fn set(&self, key: String, value: String) -> Result<()> {
self.writer.lock().unwrap().set(key, value)
}
/// Gets the string value of a given string key.
///
/// Returns `None` if the given key does not exist.
pub fn get(&self, key: String) -> Result<Option<String>> {
if let Some(cmd_pos) = self.index.get(&key) {
if let Command::Set { value, .. } = self.reader.read_command(*cmd_pos.value())? {
Ok(Some(value))
} else {
Err(GrausError::UnexpectedCommandType)
}
} else {
Ok(None)
}
}
/// Removes a given key.
///
/// Returns GrausError::KeyNotFound if the key does not exist.
pub fn remove(&self, key: String) -> Result<()> {
self.writer.lock().unwrap().remove(key)
}
/// Updates atomically an existing value.
///
/// If predicate_key and predicate are provided, it won´t update the value if the predicate
/// is not satisfied for predicate_key.
pub fn update_if<F, P>(
&self,
key: String,
update_fn: F,
predicate_key: Option<String>,
predicate: Option<P>,
) -> Result<()>
where
F: FnOnce(String) -> String,
P: FnOnce(String) -> bool,
{
let mut writer = self.writer.lock().unwrap();
let current_value = self.get(key.to_owned())?;
let Some(current_value) = current_value else {
return Err(GrausError::KeyNotFound);
};
if let (Some(predicate_key), Some(predicate)) = (predicate_key, predicate) {
let current_predicate_key_value = self.get(predicate_key)?;
let Some(current_predicate_key_value) = current_predicate_key_value else {
return Err(GrausError::KeyNotFound);
};
if !predicate(current_predicate_key_value) {
return Err(GrausError::PredicateNotSatisfied);
}
}
let updated_value = update_fn(current_value);
writer.set(key, updated_value)
}
}