use super::tx::Transaction;
use crate::cf;
use crate::ctx::Context;
use crate::dbs::{
node::Timestamp, Attach, Capabilities, Executor, Notification, Options, Response, Session,
Variables,
};
use crate::err::Error;
use crate::iam::{Action, Auth, Error as IamError, ResourceKind, Role};
use crate::key::root::hb::Hb;
use crate::kvs::clock::SizedClock;
#[allow(unused_imports)]
use crate::kvs::clock::SystemClock;
use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*, NO_LIMIT};
use crate::opt::auth::Root;
use crate::sql::{self, statements::DefineUserStatement, Base, Query, Uuid, Value};
use crate::syn;
use crate::vs::Oracle;
use channel::{Receiver, Sender};
use futures::{lock::Mutex, Future};
use std::cmp::Ordering;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
use tracing::instrument;
use tracing::trace;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::{SystemTime, UNIX_EPOCH};
const HEARTBEAT_BATCH_SIZE: u32 = 1000;
const LQ_CHANNEL_SIZE: usize = 100;
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct LqValue {
pub nd: Uuid,
pub ns: String,
pub db: String,
pub tb: String,
pub lq: Uuid,
}
#[derive(Debug)]
pub(crate) enum LqType {
Nd(LqValue),
Tb(LqValue),
}
impl LqType {
fn get_inner(&self) -> &LqValue {
match self {
LqType::Nd(lq) => lq,
LqType::Tb(lq) => lq,
}
}
}
impl PartialEq for LqType {
fn eq(&self, other: &Self) -> bool {
self.get_inner().lq == other.get_inner().lq
}
}
impl Eq for LqType {}
impl PartialOrd for LqType {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Option::Some(self.get_inner().lq.cmp(&other.get_inner().lq))
}
}
impl Ord for LqType {
fn cmp(&self, other: &Self) -> Ordering {
self.get_inner().lq.cmp(&other.get_inner().lq)
}
}
#[allow(dead_code)]
pub struct Datastore {
inner: Inner,
id: Uuid,
strict: bool,
auth_enabled: bool,
query_timeout: Option<Duration>,
transaction_timeout: Option<Duration>,
capabilities: Capabilities,
versionstamp_oracle: Arc<Mutex<Oracle>>,
notification_channel: Option<(Sender<Notification>, Receiver<Notification>)>,
clock: Arc<RwLock<SizedClock>>,
}
pub(crate) type BootstrapOperationResult = (LqValue, Option<Error>);
#[allow(clippy::large_enum_variant)]
pub(super) enum Inner {
#[cfg(feature = "kv-mem")]
Mem(super::mem::Datastore),
#[cfg(feature = "kv-rocksdb")]
RocksDB(super::rocksdb::Datastore),
#[cfg(feature = "kv-speedb")]
SpeeDB(super::speedb::Datastore),
#[cfg(feature = "kv-indxdb")]
IndxDB(super::indxdb::Datastore),
#[cfg(feature = "kv-tikv")]
TiKV(super::tikv::Datastore),
#[cfg(feature = "kv-fdb")]
FoundationDB(super::fdb::Datastore),
}
impl fmt::Display for Datastore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#![allow(unused_variables)]
match &self.inner {
#[cfg(feature = "kv-mem")]
Inner::Mem(_) => write!(f, "memory"),
#[cfg(feature = "kv-rocksdb")]
Inner::RocksDB(_) => write!(f, "rocksdb"),
#[cfg(feature = "kv-speedb")]
Inner::SpeeDB(_) => write!(f, "speedb"),
#[cfg(feature = "kv-indxdb")]
Inner::IndxDB(_) => write!(f, "indxdb"),
#[cfg(feature = "kv-tikv")]
Inner::TiKV(_) => write!(f, "tikv"),
#[cfg(feature = "kv-fdb")]
Inner::FoundationDB(_) => write!(f, "fdb"),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
}
}
impl Datastore {
pub async fn new(path: &str) -> Result<Datastore, Error> {
Self::new_full_impl(path, None).await
}
#[allow(dead_code)]
#[cfg(test)]
pub async fn new_full(
path: &str,
clock_override: Option<Arc<RwLock<SizedClock>>>,
) -> Result<Datastore, Error> {
Self::new_full_impl(path, clock_override).await
}
#[allow(dead_code)]
async fn new_full_impl(
path: &str,
#[allow(unused_variables)] clock_override: Option<Arc<RwLock<SizedClock>>>,
) -> Result<Datastore, Error> {
let default_clock: Arc<RwLock<SizedClock>> =
Arc::new(RwLock::new(SizedClock::System(SystemClock::new())));
let (inner, clock): (Result<Inner, Error>, Arc<RwLock<SizedClock>>) = match path {
"memory" => {
#[cfg(feature = "kv-mem")]
{
info!("Starting kvs store in {}", path);
let v = super::mem::Datastore::new().await.map(Inner::Mem);
let default_clock =
Arc::new(RwLock::new(SizedClock::System(SystemClock::new())));
let clock = clock_override.unwrap_or(default_clock);
info!("Started kvs store in {}", path);
Ok((v, clock))
}
#[cfg(not(feature = "kv-mem"))]
return Err(Error::Ds("Cannot connect to the `memory` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
s if s.starts_with("file:") => {
#[cfg(feature = "kv-rocksdb")]
{
info!("Starting kvs store at {}", path);
let s = s.trim_start_matches("file://");
let s = s.trim_start_matches("file:");
let v = super::rocksdb::Datastore::new(s).await.map(Inner::RocksDB);
let default_clock =
Arc::new(RwLock::new(SizedClock::System(SystemClock::new())));
let clock = clock_override.unwrap_or(default_clock);
info!("Started kvs store at {}", path);
Ok((v, clock))
}
#[cfg(not(feature = "kv-rocksdb"))]
return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
s if s.starts_with("rocksdb:") => {
#[cfg(feature = "kv-rocksdb")]
{
info!("Starting kvs store at {}", path);
let s = s.trim_start_matches("rocksdb://");
let s = s.trim_start_matches("rocksdb:");
let v = super::rocksdb::Datastore::new(s).await.map(Inner::RocksDB);
info!("Started kvs store at {}", path);
let default_clock =
Arc::new(RwLock::new(SizedClock::System(SystemClock::new())));
let clock = clock_override.unwrap_or(default_clock);
Ok((v, clock))
}
#[cfg(not(feature = "kv-rocksdb"))]
return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
s if s.starts_with("speedb:") => {
#[cfg(feature = "kv-speedb")]
{
info!("Starting kvs store at {}", path);
let s = s.trim_start_matches("speedb://");
let s = s.trim_start_matches("speedb:");
let v = super::speedb::Datastore::new(s).await.map(Inner::SpeeDB);
info!("Started kvs store at {}", path);
let default_clock =
Arc::new(RwLock::new(SizedClock::System(SystemClock::new())));
let clock = clock_override.unwrap_or(default_clock);
Ok((v, clock))
}
#[cfg(not(feature = "kv-speedb"))]
return Err(Error::Ds("Cannot connect to the `speedb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
s if s.starts_with("indxdb:") => {
#[cfg(feature = "kv-indxdb")]
{
info!("Starting kvs store at {}", path);
let s = s.trim_start_matches("indxdb://");
let s = s.trim_start_matches("indxdb:");
let v = super::indxdb::Datastore::new(s).await.map(Inner::IndxDB);
info!("Started kvs store at {}", path);
let default_clock =
Arc::new(RwLock::new(SizedClock::System(SystemClock::new())));
let clock = clock_override.unwrap_or(default_clock);
Ok((v, clock))
}
#[cfg(not(feature = "kv-indxdb"))]
return Err(Error::Ds("Cannot connect to the `indxdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
s if s.starts_with("tikv:") => {
#[cfg(feature = "kv-tikv")]
{
info!("Connecting to kvs store at {}", path);
let s = s.trim_start_matches("tikv://");
let s = s.trim_start_matches("tikv:");
let v = super::tikv::Datastore::new(s).await.map(Inner::TiKV);
info!("Connected to kvs store at {}", path);
let default_clock =
Arc::new(RwLock::new(SizedClock::System(SystemClock::new())));
let clock = clock_override.unwrap_or(default_clock);
Ok((v, clock))
}
#[cfg(not(feature = "kv-tikv"))]
return Err(Error::Ds("Cannot connect to the `tikv` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
s if s.starts_with("fdb:") => {
#[cfg(feature = "kv-fdb")]
{
info!("Connecting to kvs store at {}", path);
let s = s.trim_start_matches("fdb://");
let s = s.trim_start_matches("fdb:");
let v = super::fdb::Datastore::new(s).await.map(Inner::FoundationDB);
info!("Connected to kvs store at {}", path);
let default_clock =
Arc::new(RwLock::new(SizedClock::System(SystemClock::new())));
let clock = clock_override.unwrap_or(default_clock);
Ok((v, clock))
}
#[cfg(not(feature = "kv-fdb"))]
return Err(Error::Ds("Cannot connect to the `foundationdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
_ => {
let _ = (clock_override, default_clock);
info!("Unable to load the specified datastore {}", path);
Err(Error::Ds("Unable to load the specified datastore".into()))
}
}?;
inner.map(|inner| Self {
id: Uuid::new_v4(),
inner,
strict: false,
auth_enabled: false,
query_timeout: None,
transaction_timeout: None,
notification_channel: None,
capabilities: Capabilities::default(),
versionstamp_oracle: Arc::new(Mutex::new(Oracle::systime_counter())),
clock,
})
}
pub fn with_node_id(mut self, id: Uuid) -> Self {
self.id = id;
self
}
pub fn with_strict_mode(mut self, strict: bool) -> Self {
self.strict = strict;
self
}
pub fn with_notifications(mut self) -> Self {
self.notification_channel = Some(channel::bounded(LQ_CHANNEL_SIZE));
self
}
pub fn with_query_timeout(mut self, duration: Option<Duration>) -> Self {
self.query_timeout = duration;
self
}
pub fn with_transaction_timeout(mut self, duration: Option<Duration>) -> Self {
self.transaction_timeout = duration;
self
}
pub fn with_auth_enabled(mut self, enabled: bool) -> Self {
self.auth_enabled = enabled;
self
}
pub fn with_capabilities(mut self, caps: Capabilities) -> Self {
self.capabilities = caps;
self
}
pub fn is_auth_enabled(&self) -> bool {
self.auth_enabled
}
#[allow(unreachable_code, unused_variables)]
pub async fn setup_initial_creds(&self, creds: Root<'_>) -> Result<(), Error> {
let txn = self.transaction(Write, Optimistic).await?.rollback_with_panic().enclose();
let users = txn.lock().await.all_root_users().await;
match users {
Ok(v) if v.is_empty() => {
info!("Credentials were provided, and no root users were found. The root user '{}' will be created", creds.username);
let stm = DefineUserStatement::from((Base::Root, creds.username, creds.password));
let ctx = Context::default();
let opt = Options::new().with_auth(Arc::new(Auth::for_root(Role::Owner)));
let _ = stm.compute(&ctx, &opt, &txn, None).await?;
txn.lock().await.commit().await?;
Ok(())
}
Ok(_) => {
warn!("Credentials were provided, but existing root users were found. The root user '{}' will not be created", creds.username);
warn!("Consider removing the --user and --pass arguments from the server start command");
txn.lock().await.cancel().await?;
Ok(())
}
Err(e) => {
txn.lock().await.cancel().await?;
Err(e)
}
}
}
pub async fn bootstrap(&self) -> Result<(), Error> {
trace!("Clearing unreachable state");
let mut tx = self.transaction(Write, Optimistic).await?;
match self.clear_unreachable_state(&mut tx).await {
Ok(_) => tx.commit().await,
Err(e) => {
let msg = format!("Error clearing unreachable cluster state at bootstrap: {:?}", e);
error!(msg);
tx.cancel().await?;
Err(Error::Tx(msg))
}
}?;
trace!("Bootstrapping {}", self.id);
let mut tx = self.transaction(Write, Optimistic).await?;
let archived = match self.register_remove_and_archive(&mut tx, &self.id).await {
Ok(archived) => {
tx.commit().await?;
archived
}
Err(e) => {
error!("Error bootstrapping mark phase: {:?}", e);
tx.cancel().await?;
return Err(e);
}
};
let mut filtered: Vec<LqValue> = vec![];
let mut err = vec![];
for res in archived {
match res {
(lq, Some(e)) => {
filtered.push(lq);
err.push(e);
}
(lq, None) => {
filtered.push(lq);
}
}
}
let mut tx = self.transaction(Write, Optimistic).await?;
let val = self.remove_archived(&mut tx, filtered).await;
let resolve_err = match val {
Ok(_) => tx.commit().await,
Err(e) => {
error!("Error bootstrapping sweep phase: {:?}", e);
match tx.cancel().await {
Ok(_) => Err(e),
Err(e) => {
Err(Error::Tx(format!("Error bootstrapping sweep phase: {:?} and error cancelling transaction: {:?}", e, e)))
}
}
}
};
if let Err(e) = resolve_err {
err.push(e);
}
if !err.is_empty() {
error!("Error bootstrapping sweep phase: {:?}", err);
return Err(Error::Tx(format!("Error bootstrapping sweep phase: {:?}", err)));
}
Ok(())
}
pub async fn register_remove_and_archive(
&self,
tx: &mut Transaction,
node_id: &Uuid,
) -> Result<Vec<BootstrapOperationResult>, Error> {
trace!("Registering node {}", node_id);
let timestamp = tx.clock().await;
self.register_membership(tx, node_id, timestamp).await?;
let ts_expired = (×tamp - &sql::duration::Duration::from_secs(5))?;
let dead = self.remove_dead_nodes(tx, &ts_expired).await?;
trace!("Archiving dead nodes: {:?}", dead);
self.archive_dead_lqs(tx, &dead, node_id).await
}
pub async fn register_membership(
&self,
tx: &mut Transaction,
node_id: &Uuid,
timestamp: Timestamp,
) -> Result<(), Error> {
tx.set_nd(node_id.0).await?;
tx.set_hb(timestamp, node_id.0).await?;
Ok(())
}
pub async fn remove_dead_nodes(
&self,
tx: &mut Transaction,
ts: &Timestamp,
) -> Result<Vec<Uuid>, Error> {
let hbs = self.delete_dead_heartbeats(tx, ts).await?;
trace!("Found {} expired heartbeats", hbs.len());
let mut nodes = vec![];
for hb in hbs {
trace!("Deleting node {}", &hb.nd);
tx.del_nd(hb.nd).await?;
nodes.push(crate::sql::uuid::Uuid::from(hb.nd));
}
Ok(nodes)
}
pub async fn archive_dead_lqs(
&self,
tx: &mut Transaction,
nodes: &[Uuid],
this_node_id: &Uuid,
) -> Result<Vec<BootstrapOperationResult>, Error> {
let mut archived = vec![];
for nd in nodes.iter() {
trace!("Archiving node {}", &nd);
let node_lqs = tx.scan_ndlq(nd, NO_LIMIT).await?;
trace!("Found {} LQ entries for {:?}", node_lqs.len(), nd);
for lq in node_lqs {
trace!("Archiving query {:?}", &lq);
let node_archived_lqs =
match self.archive_lv_for_node(tx, &lq.nd, *this_node_id).await {
Ok(lq) => lq,
Err(e) => {
error!("Error archiving lqs during bootstrap phase: {:?}", e);
vec![]
}
};
for lq_value in node_archived_lqs {
archived.push(lq_value);
}
}
}
Ok(archived)
}
pub async fn remove_archived(
&self,
tx: &mut Transaction,
archived: Vec<LqValue>,
) -> Result<(), Error> {
trace!("Gone into removing archived: {:?}", archived.len());
for lq in archived {
let key = crate::key::node::lq::new(lq.nd.0, lq.lq.0, &lq.ns, &lq.db);
tx.del(key).await?;
let key = crate::key::table::lq::new(&lq.ns, &lq.db, &lq.tb, lq.lq.0);
tx.del(key).await?;
}
Ok(())
}
pub async fn clear_unreachable_state(&self, tx: &mut Transaction) -> Result<(), Error> {
let cluster = tx.scan_nd(NO_LIMIT).await?;
trace!("Found {} nodes", cluster.len());
let mut unreachable_nodes = BTreeMap::new();
for cl in &cluster {
unreachable_nodes.insert(cl.name.clone(), cl.clone());
}
let end_of_time = Timestamp {
value: u64::MAX - 1,
};
let hbs = tx.scan_hb(&end_of_time, NO_LIMIT).await?;
trace!("Found {} heartbeats", hbs.len());
for hb in hbs {
match unreachable_nodes.remove(&hb.nd.to_string()) {
None => {
tx.del_hb(hb.hb, hb.nd).await?;
}
Some(_) => {}
}
}
for (_, cl) in unreachable_nodes {
trace!("Removing unreachable node {}", cl.name);
tx.del_nd(
uuid::Uuid::parse_str(&cl.name).map_err(|e| {
Error::Unimplemented(format!("cluster id was not uuid: {:?}", e))
})?,
)
.await?;
}
let mut nd_lq_set: BTreeSet<LqType> = BTreeSet::new();
for cl in &cluster {
let nds = tx.scan_ndlq(&uuid::Uuid::parse_str(&cl.name).map_err(|e| {
Error::Unimplemented(format!("cluster id was not uuid when parsing to aggregate cluster live queries: {:?}", e))
})?, NO_LIMIT).await?;
nd_lq_set.extend(nds.into_iter().map(LqType::Nd));
}
trace!("Found {} node live queries", nd_lq_set.len());
let mut tb_lq_set: BTreeSet<LqType> = BTreeSet::new();
for ndlq in &nd_lq_set {
let lq = ndlq.get_inner();
let tbs = tx.scan_tblq(&lq.ns, &lq.db, &lq.tb, NO_LIMIT).await?;
tb_lq_set.extend(tbs.into_iter().map(LqType::Tb));
}
trace!("Found {} table live queries", tb_lq_set.len());
for missing in nd_lq_set.symmetric_difference(&tb_lq_set) {
match missing {
LqType::Nd(ndlq) => {
warn!("Deleting ndlq {:?}", &ndlq);
tx.del_ndlq(ndlq.nd.0, ndlq.lq.0, &ndlq.ns, &ndlq.db).await?;
}
LqType::Tb(tblq) => {
warn!("Deleting tblq {:?}", &tblq);
tx.del_tblq(&tblq.ns, &tblq.db, &tblq.tb, tblq.lq.0).await?;
}
}
}
trace!("Successfully cleared cluster of unreachable state");
Ok(())
}
pub async fn garbage_collect_dead_session(
&self,
live_queries: &[uuid::Uuid],
) -> Result<(), Error> {
let mut tx = self.transaction(Write, Optimistic).await?;
let lqs = tx.scan_ndlq(&self.id, NO_LIMIT).await?;
let mut hits = vec![];
for lq_value in lqs {
if live_queries.contains(&lq_value.lq) {
hits.push(lq_value.clone());
let lq = crate::key::node::lq::Lq::new(
lq_value.nd.0,
lq_value.lq.0,
lq_value.ns.as_str(),
lq_value.db.as_str(),
);
tx.del(lq).await?;
trace!("Deleted lq {:?} as part of session garbage collection", lq_value.clone());
}
}
for lq in hits {
let lv =
crate::key::table::lq::new(lq.ns.as_str(), lq.db.as_str(), lq.tb.as_str(), lq.lq.0);
tx.del(lv.clone()).await?;
trace!("Deleted lv {:?} as part of session garbage collection", lv);
}
tx.commit().await
}
pub async fn archive_lv_for_node(
&self,
tx: &mut Transaction,
nd: &Uuid,
this_node_id: Uuid,
) -> Result<Vec<BootstrapOperationResult>, Error> {
let lqs = tx.all_lq(nd).await?;
trace!("Archiving lqs and found {} LQ entries for {}", lqs.len(), nd);
let mut ret: Vec<BootstrapOperationResult> = vec![];
for lq in lqs {
let lv_res =
tx.get_tb_live(lq.ns.as_str(), lq.db.as_str(), lq.tb.as_str(), &lq.lq).await;
if let Err(e) = lv_res {
error!("Error getting live query for node {}: {:?}", nd, e);
ret.push((lq, Some(e)));
continue;
}
let lv = lv_res.unwrap();
let archived_lvs = lv.clone().archive(this_node_id);
tx.putc_tblq(&lq.ns, &lq.db, &lq.tb, archived_lvs, Some(lv)).await?;
ret.push((lq, None));
}
Ok(ret)
}
pub async fn delete_dead_heartbeats(
&self,
tx: &mut Transaction,
ts: &Timestamp,
) -> Result<Vec<Hb>, Error> {
let dead = tx.scan_hb(ts, HEARTBEAT_BATCH_SIZE).await?;
tx.delr_hb(dead.clone(), NO_LIMIT).await?;
for dead_node in dead.clone() {
tx.del_nd(dead_node.nd).await?;
}
Ok::<Vec<Hb>, Error>(dead)
}
pub async fn tick(&self) -> Result<(), Error> {
let now = SystemTime::now().duration_since(UNIX_EPOCH).map_err(|e| {
Error::Internal(format!("Clock may have gone backwards: {:?}", e.duration()))
})?;
let ts = now.as_secs();
self.tick_at(ts).await?;
Ok(())
}
pub async fn tick_at(&self, ts: u64) -> Result<(), Error> {
self.save_timestamp_for_versionstamp(ts).await?;
self.garbage_collect_stale_change_feeds(ts).await?;
Ok(())
}
pub async fn save_timestamp_for_versionstamp(&self, ts: u64) -> Result<(), Error> {
let mut tx = self.transaction(Write, Optimistic).await?;
if let Err(e) = self.save_timestamp_for_versionstamp_impl(ts, &mut tx).await {
return match tx.cancel().await {
Ok(_) => {
Err(e)
}
Err(txe) => {
Err(Error::Tx(format!("Error saving timestamp for versionstamp: {:?} and error cancelling transaction: {:?}", e, txe)))
}
};
}
Ok(())
}
async fn save_timestamp_for_versionstamp_impl(
&self,
ts: u64,
tx: &mut Transaction,
) -> Result<(), Error> {
let nses = tx.all_ns().await?;
let nses = nses.as_ref();
for ns in nses {
let ns = ns.name.as_str();
let dbs = tx.all_db(ns).await?;
let dbs = dbs.as_ref();
for db in dbs {
let db = db.name.as_str();
tx.set_timestamp_for_versionstamp(ts, ns, db, true).await?;
}
}
tx.commit().await?;
Ok(())
}
pub async fn garbage_collect_stale_change_feeds(&self, ts: u64) -> Result<(), Error> {
let mut tx = self.transaction(Write, Optimistic).await?;
if let Err(e) = self.garbage_collect_stale_change_feeds_impl(ts, &mut tx).await {
return match tx.cancel().await {
Ok(_) => {
Err(e)
}
Err(txe) => {
Err(Error::Tx(format!("Error garbage collecting stale change feeds: {:?} and error cancelling transaction: {:?}", e, txe)))
}
};
}
Ok(())
}
async fn garbage_collect_stale_change_feeds_impl(
&self,
ts: u64,
tx: &mut Transaction,
) -> Result<(), Error> {
cf::gc_all_at(tx, ts, Some(100)).await?;
tx.commit().await?;
Ok(())
}
pub async fn heartbeat(&self) -> Result<(), Error> {
let mut tx = self.transaction(Write, Optimistic).await?;
let timestamp = tx.clock().await;
self.heartbeat_full(&mut tx, timestamp, self.id).await?;
tx.commit().await
}
pub async fn heartbeat_full(
&self,
tx: &mut Transaction,
timestamp: Timestamp,
node_id: Uuid,
) -> Result<(), Error> {
tx.set_hb(timestamp, node_id.0).await
}
pub async fn transaction(
&self,
write: TransactionType,
lock: LockType,
) -> Result<Transaction, Error> {
#![allow(unused_variables)]
let write = match write {
TransactionType::Read => false,
TransactionType::Write => true,
};
let lock = match lock {
LockType::Pessimistic => true,
LockType::Optimistic => false,
};
let inner = match &self.inner {
#[cfg(feature = "kv-mem")]
Inner::Mem(v) => {
let tx = v.transaction(write, lock).await?;
super::tx::Inner::Mem(tx)
}
#[cfg(feature = "kv-rocksdb")]
Inner::RocksDB(v) => {
let tx = v.transaction(write, lock).await?;
super::tx::Inner::RocksDB(tx)
}
#[cfg(feature = "kv-speedb")]
Inner::SpeeDB(v) => {
let tx = v.transaction(write, lock).await?;
super::tx::Inner::SpeeDB(tx)
}
#[cfg(feature = "kv-indxdb")]
Inner::IndxDB(v) => {
let tx = v.transaction(write, lock).await?;
super::tx::Inner::IndxDB(tx)
}
#[cfg(feature = "kv-tikv")]
Inner::TiKV(v) => {
let tx = v.transaction(write, lock).await?;
super::tx::Inner::TiKV(tx)
}
#[cfg(feature = "kv-fdb")]
Inner::FoundationDB(v) => {
let tx = v.transaction(write, lock).await?;
super::tx::Inner::FoundationDB(tx)
}
#[allow(unreachable_patterns)]
_ => unreachable!(),
};
#[allow(unreachable_code)]
Ok(Transaction {
inner,
cache: super::cache::Cache::default(),
cf: cf::Writer::new(),
vso: self.versionstamp_oracle.clone(),
clock: self.clock.clone(),
})
}
#[instrument(level = "debug", skip_all)]
pub async fn execute(
&self,
txt: &str,
sess: &Session,
vars: Variables,
) -> Result<Vec<Response>, Error> {
let ast = syn::parse(txt)?;
self.process(ast, sess, vars).await
}
#[instrument(level = "debug", skip_all)]
pub async fn process(
&self,
ast: Query,
sess: &Session,
vars: Variables,
) -> Result<Vec<Response>, Error> {
if self.auth_enabled && sess.au.is_anon() && !self.capabilities.allows_guest_access() {
return Err(IamError::NotAllowed {
actor: "anonymous".to_string(),
action: "process".to_string(),
resource: "query".to_string(),
}
.into());
}
let opt = Options::default()
.with_id(self.id.0)
.with_ns(sess.ns())
.with_db(sess.db())
.with_live(sess.live())
.with_auth(sess.au.clone())
.with_strict(self.strict)
.with_auth_enabled(self.auth_enabled);
let mut exe = Executor::new(self);
let mut ctx = Context::default();
ctx.add_capabilities(self.capabilities.clone());
if let Some(timeout) = self.query_timeout {
ctx.add_timeout(timeout);
}
if let Some(channel) = &self.notification_channel {
ctx.add_notifications(Some(&channel.0));
}
let ctx = sess.context(ctx);
let ctx = vars.attach(ctx)?;
exe.execute(ctx, opt, ast).await
}
#[instrument(level = "debug", skip_all)]
pub async fn compute(
&self,
val: Value,
sess: &Session,
vars: Variables,
) -> Result<Value, Error> {
if self.auth_enabled && !self.capabilities.allows_guest_access() {
return Err(IamError::NotAllowed {
actor: "anonymous".to_string(),
action: "compute".to_string(),
resource: "value".to_string(),
}
.into());
}
let opt = Options::default()
.with_id(self.id.0)
.with_ns(sess.ns())
.with_db(sess.db())
.with_live(sess.live())
.with_auth(sess.au.clone())
.with_strict(self.strict)
.with_auth_enabled(self.auth_enabled);
let mut ctx = Context::default();
ctx.add_capabilities(self.capabilities.clone());
if let Some(timeout) = self.query_timeout {
ctx.add_timeout(timeout);
}
if let Some(channel) = &self.notification_channel {
ctx.add_notifications(Some(&channel.0));
}
let ctx = sess.context(ctx);
let ctx = vars.attach(ctx)?;
let txn = self.transaction(val.writeable().into(), Optimistic).await?.enclose();
let res = val.compute(&ctx, &opt, &txn, None).await;
match (res.is_ok(), val.writeable()) {
(true, true) => txn.lock().await.commit().await?,
(_, _) => txn.lock().await.cancel().await?,
};
res
}
#[instrument(level = "debug", skip_all)]
pub async fn evaluate(
&self,
val: Value,
sess: &Session,
vars: Variables,
) -> Result<Value, Error> {
let opt = Options::default()
.with_id(self.id.0)
.with_ns(sess.ns())
.with_db(sess.db())
.with_live(sess.live())
.with_auth(sess.au.clone())
.with_strict(self.strict)
.with_auth_enabled(self.auth_enabled);
let mut ctx = Context::default();
ctx.add_capabilities(self.capabilities.clone());
if let Some(timeout) = self.query_timeout {
ctx.add_timeout(timeout);
}
if let Some(channel) = &self.notification_channel {
ctx.add_notifications(Some(&channel.0));
}
let ctx = sess.context(ctx);
let ctx = vars.attach(ctx)?;
let txn = self.transaction(val.writeable().into(), Optimistic).await?.enclose();
let res = val.compute(&ctx, &opt, &txn, None).await;
match (res.is_ok(), val.writeable()) {
(true, true) => txn.lock().await.commit().await?,
(_, _) => txn.lock().await.cancel().await?,
};
res
}
#[instrument(level = "debug", skip_all)]
pub fn notifications(&self) -> Option<Receiver<Notification>> {
self.notification_channel.as_ref().map(|v| v.1.clone())
}
#[allow(dead_code)]
pub(crate) fn live_sender(&self) -> Option<Arc<RwLock<Sender<Notification>>>> {
self.notification_channel.as_ref().map(|v| Arc::new(RwLock::new(v.0.clone())))
}
#[instrument(level = "debug", skip(self, sess, chn))]
pub async fn export(
&self,
sess: &Session,
ns: String,
db: String,
chn: Sender<Vec<u8>>,
) -> Result<impl Future<Output = Result<(), Error>>, Error> {
let skip_auth = !self.is_auth_enabled() && sess.au.is_anon();
if !skip_auth {
sess.au.is_allowed(Action::View, &ResourceKind::Any.on_db(&ns, &db))?;
}
let mut txn = self.transaction(Read, Optimistic).await?;
Ok(async move {
txn.export(&ns, &db, chn).await?;
Ok(())
})
}
#[instrument(level = "debug", skip(self, sess, sql))]
pub async fn import(&self, sql: &str, sess: &Session) -> Result<Vec<Response>, Error> {
let skip_auth = !self.is_auth_enabled() && sess.au.is_anon();
if !skip_auth {
sess.au.is_allowed(
Action::Edit,
&ResourceKind::Any.on_level(sess.au.level().to_owned()),
)?;
}
self.execute(sql, sess, None).await
}
}