use super::export;
use super::tr::Transactor;
use super::tx::Transaction;
use super::version::Version;
use crate::cf;
use crate::ctx::MutableContext;
#[cfg(feature = "jwks")]
use crate::dbs::capabilities::NetTarget;
use crate::dbs::capabilities::{ExperimentalTarget, MethodTarget, RouteTarget};
use crate::dbs::node::Timestamp;
use crate::dbs::{
Attach, Capabilities, Executor, Notification, Options, Response, Session, Variables,
};
use crate::err::Error;
#[cfg(feature = "jwks")]
use crate::iam::jwks::JwksCache;
use crate::iam::{Action, Auth, Error as IamError, Resource, Role};
use crate::idx::trees::store::IndexStores;
use crate::kvs::cache::ds::DatastoreCache;
use crate::kvs::clock::SizedClock;
#[allow(unused_imports)]
use crate::kvs::clock::SystemClock;
#[cfg(not(target_family = "wasm"))]
use crate::kvs::index::IndexBuilder;
use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*};
use crate::sql::{statements::DefineUserStatement, Base, Query, Value};
use crate::syn;
use crate::syn::parser::{ParserSettings, StatementStream};
use async_channel::{Receiver, Sender};
use bytes::{Bytes, BytesMut};
use futures::{Future, Stream};
use reblessive::TreeStack;
use std::fmt;
#[cfg(storage)]
use std::path::PathBuf;
use std::pin::pin;
use std::sync::Arc;
use std::task::{ready, Poll};
use std::time::Duration;
#[cfg(not(target_family = "wasm"))]
use std::time::{SystemTime, UNIX_EPOCH};
#[cfg(feature = "jwks")]
use tokio::sync::RwLock;
use tracing::instrument;
use tracing::trace;
use uuid::Uuid;
#[cfg(target_family = "wasm")]
use wasmtimer::std::{SystemTime, UNIX_EPOCH};
const TARGET: &str = "surrealdb::core::kvs::ds";
const LQ_CHANNEL_SIZE: usize = 15_000;
const INITIAL_USER_ROLE: &str = "owner";
#[allow(dead_code)]
#[non_exhaustive]
pub struct Datastore {
transaction_factory: TransactionFactory,
id: Uuid,
strict: bool,
auth_enabled: bool,
query_timeout: Option<Duration>,
transaction_timeout: Option<Duration>,
capabilities: Capabilities,
notification_channel: Option<(Sender<Notification>, Receiver<Notification>)>,
index_stores: IndexStores,
cache: Arc<DatastoreCache>,
#[cfg(not(target_family = "wasm"))]
index_builder: IndexBuilder,
#[cfg(feature = "jwks")]
jwks_cache: Arc<RwLock<JwksCache>>,
#[cfg(storage)]
temporary_directory: Option<Arc<PathBuf>>,
}
#[derive(Clone)]
pub(super) struct TransactionFactory {
clock: Arc<SizedClock>,
flavor: Arc<DatastoreFlavor>,
}
impl TransactionFactory {
#[allow(unreachable_code)]
pub async fn transaction(
&self,
write: TransactionType,
lock: LockType,
) -> Result<Transaction, Error> {
#[allow(unused_variables)]
let write = match write {
Read => false,
Write => true,
};
#[allow(unused_variables)]
let lock = match lock {
Pessimistic => true,
Optimistic => false,
};
#[allow(unused_variables)]
let (inner, local) = match self.flavor.as_ref() {
#[cfg(feature = "kv-mem")]
DatastoreFlavor::Mem(v) => {
let tx = v.transaction(write, lock).await?;
(super::tr::Inner::Mem(tx), true)
}
#[cfg(feature = "kv-rocksdb")]
DatastoreFlavor::RocksDB(v) => {
let tx = v.transaction(write, lock).await?;
(super::tr::Inner::RocksDB(tx), true)
}
#[cfg(feature = "kv-indxdb")]
DatastoreFlavor::IndxDB(v) => {
let tx = v.transaction(write, lock).await?;
(super::tr::Inner::IndxDB(tx), true)
}
#[cfg(feature = "kv-tikv")]
DatastoreFlavor::TiKV(v) => {
let tx = v.transaction(write, lock).await?;
(super::tr::Inner::TiKV(tx), false)
}
#[cfg(feature = "kv-fdb")]
DatastoreFlavor::FoundationDB(v) => {
let tx = v.transaction(write, lock).await?;
(super::tr::Inner::FoundationDB(tx), false)
}
#[cfg(feature = "kv-surrealkv")]
DatastoreFlavor::SurrealKV(v) => {
let tx = v.transaction(write, lock).await?;
(super::tr::Inner::SurrealKV(tx), true)
}
#[cfg(feature = "kv-surrealcs")]
DatastoreFlavor::SurrealCS(v) => {
let tx = v.transaction(write, lock).await?;
(super::tr::Inner::SurrealCS(tx), false)
}
#[allow(unreachable_patterns)]
_ => unreachable!(),
};
Ok(Transaction::new(
local,
Transactor {
inner,
stash: super::stash::Stash::default(),
cf: cf::Writer::new(),
clock: self.clock.clone(),
},
))
}
}
#[allow(clippy::large_enum_variant)]
pub(super) enum DatastoreFlavor {
#[cfg(feature = "kv-mem")]
Mem(super::mem::Datastore),
#[cfg(feature = "kv-rocksdb")]
RocksDB(super::rocksdb::Datastore),
#[cfg(feature = "kv-indxdb")]
IndxDB(super::indxdb::Datastore),
#[cfg(feature = "kv-tikv")]
TiKV(super::tikv::Datastore),
#[cfg(feature = "kv-fdb")]
FoundationDB(super::fdb::Datastore),
#[cfg(feature = "kv-surrealkv")]
SurrealKV(super::surrealkv::Datastore),
#[cfg(feature = "kv-surrealcs")]
SurrealCS(super::surrealcs::Datastore),
}
impl fmt::Display for Datastore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#![allow(unused_variables)]
match self.transaction_factory.flavor.as_ref() {
#[cfg(feature = "kv-mem")]
DatastoreFlavor::Mem(_) => write!(f, "memory"),
#[cfg(feature = "kv-rocksdb")]
DatastoreFlavor::RocksDB(_) => write!(f, "rocksdb"),
#[cfg(feature = "kv-indxdb")]
DatastoreFlavor::IndxDB(_) => write!(f, "indxdb"),
#[cfg(feature = "kv-tikv")]
DatastoreFlavor::TiKV(_) => write!(f, "tikv"),
#[cfg(feature = "kv-fdb")]
DatastoreFlavor::FoundationDB(_) => write!(f, "fdb"),
#[cfg(feature = "kv-surrealkv")]
DatastoreFlavor::SurrealKV(_) => write!(f, "surrealkv"),
#[cfg(feature = "kv-surrealcs")]
DatastoreFlavor::SurrealCS(_) => write!(f, "surrealcs"),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
}
}
impl Datastore {
pub async fn new(path: &str) -> Result<Self, Error> {
Self::new_with_clock(path, None).await
}
#[allow(unused_variables)]
pub async fn new_with_clock(
path: &str,
clock: Option<Arc<SizedClock>>,
) -> Result<Datastore, Error> {
let (flavor, clock): (Result<DatastoreFlavor, Error>, Arc<SizedClock>) = match path {
"memory" => {
#[cfg(feature = "kv-mem")]
{
info!(target: TARGET, "Starting kvs store in {}", path);
let v = super::mem::Datastore::new().await.map(DatastoreFlavor::Mem);
let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
info!(target: TARGET, "Started kvs store in {}", path);
Ok((v, c))
}
#[cfg(not(feature = "kv-mem"))]
return Err(Error::Ds("Cannot connect to the `memory` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
s if s.starts_with("file:") => {
#[cfg(feature = "kv-rocksdb")]
{
info!(target: TARGET, "Starting kvs store at {}", path);
warn!("file:// is deprecated, please use surrealkv:// or rocksdb://");
let s = s.trim_start_matches("file://");
let s = s.trim_start_matches("file:");
let v = super::rocksdb::Datastore::new(s).await.map(DatastoreFlavor::RocksDB);
let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
info!(target: TARGET, "Started kvs store at {}", path);
Ok((v, c))
}
#[cfg(not(feature = "kv-rocksdb"))]
return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
s if s.starts_with("rocksdb:") => {
#[cfg(feature = "kv-rocksdb")]
{
info!(target: TARGET, "Starting kvs store at {}", path);
let s = s.trim_start_matches("rocksdb://");
let s = s.trim_start_matches("rocksdb:");
let v = super::rocksdb::Datastore::new(s).await.map(DatastoreFlavor::RocksDB);
let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
info!(target: TARGET, "Started kvs store at {}", path);
Ok((v, c))
}
#[cfg(not(feature = "kv-rocksdb"))]
return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
s if s.starts_with("surrealkv") => {
#[cfg(feature = "kv-surrealkv")]
{
info!(target: TARGET, "Starting kvs store at {}", s);
let (path, enable_versions) =
super::surrealkv::Datastore::parse_start_string(s)?;
let v = super::surrealkv::Datastore::new(path, enable_versions)
.await
.map(DatastoreFlavor::SurrealKV);
let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
info!(target: TARGET, "Started kvs store at {} with versions {}", path, if enable_versions { "enabled" } else { "disabled" });
Ok((v, c))
}
#[cfg(not(feature = "kv-surrealkv"))]
return Err(Error::Ds("Cannot connect to the `surrealkv` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
s if s.starts_with("surrealcs:") => {
#[cfg(feature = "kv-surrealcs")]
{
info!(target: TARGET, "Starting kvs store at {}", path);
let s = s.trim_start_matches("surrealcs://");
let s = s.trim_start_matches("surrealcs:");
let v =
super::surrealcs::Datastore::new(s).await.map(DatastoreFlavor::SurrealCS);
let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
info!(target: TARGET, "Started kvs store at {}", path);
Ok((v, c))
}
#[cfg(not(feature = "kv-surrealcs"))]
return Err(Error::Ds("Cannot connect to the `surrealcs` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
s if s.starts_with("indxdb:") => {
#[cfg(feature = "kv-indxdb")]
{
info!(target: TARGET, "Starting kvs store at {}", path);
let s = s.trim_start_matches("indxdb://");
let s = s.trim_start_matches("indxdb:");
let v = super::indxdb::Datastore::new(s).await.map(DatastoreFlavor::IndxDB);
let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
info!(target: TARGET, "Started kvs store at {}", path);
Ok((v, c))
}
#[cfg(not(feature = "kv-indxdb"))]
return Err(Error::Ds("Cannot connect to the `indxdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
s if s.starts_with("tikv:") => {
#[cfg(feature = "kv-tikv")]
{
info!(target: TARGET, "Connecting to kvs store at {}", path);
let s = s.trim_start_matches("tikv://");
let s = s.trim_start_matches("tikv:");
let v = super::tikv::Datastore::new(s).await.map(DatastoreFlavor::TiKV);
let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
info!(target: TARGET, "Connected to kvs store at {}", path);
Ok((v, c))
}
#[cfg(not(feature = "kv-tikv"))]
return Err(Error::Ds("Cannot connect to the `tikv` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
s if s.starts_with("fdb:") => {
#[cfg(feature = "kv-fdb")]
{
info!(target: TARGET, "Connecting to kvs store at {}", path);
let s = s.trim_start_matches("fdb://");
let s = s.trim_start_matches("fdb:");
let v = super::fdb::Datastore::new(s).await.map(DatastoreFlavor::FoundationDB);
let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
info!(target: TARGET, "Connected to kvs store at {}", path);
Ok((v, c))
}
#[cfg(not(feature = "kv-fdb"))]
return Err(Error::Ds("Cannot connect to the `foundationdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
}
_ => {
info!(target: TARGET, "Unable to load the specified datastore {}", path);
Err(Error::Ds("Unable to load the specified datastore".into()))
}
}?;
flavor.map(|flavor| {
let tf = TransactionFactory {
clock,
flavor: Arc::new(flavor),
};
Self {
id: Uuid::new_v4(),
transaction_factory: tf.clone(),
strict: false,
auth_enabled: false,
query_timeout: None,
transaction_timeout: None,
notification_channel: None,
capabilities: Capabilities::default(),
index_stores: IndexStores::default(),
#[cfg(not(target_family = "wasm"))]
index_builder: IndexBuilder::new(tf),
#[cfg(feature = "jwks")]
jwks_cache: Arc::new(RwLock::new(JwksCache::new())),
#[cfg(storage)]
temporary_directory: None,
cache: Arc::new(DatastoreCache::new()),
}
})
}
#[allow(dead_code)]
pub fn restart(self) -> Self {
Self {
id: self.id,
strict: self.strict,
auth_enabled: self.auth_enabled,
query_timeout: self.query_timeout,
transaction_timeout: self.transaction_timeout,
capabilities: self.capabilities,
notification_channel: self.notification_channel,
index_stores: Default::default(),
#[cfg(not(target_family = "wasm"))]
index_builder: IndexBuilder::new(self.transaction_factory.clone()),
#[cfg(feature = "jwks")]
jwks_cache: Arc::new(Default::default()),
#[cfg(storage)]
temporary_directory: self.temporary_directory,
transaction_factory: self.transaction_factory,
cache: Arc::new(DatastoreCache::new()),
}
}
pub fn with_node_id(mut self, id: Uuid) -> Self {
self.id = id;
self
}
pub fn with_strict_mode(mut self, strict: bool) -> Self {
self.strict = strict;
self
}
pub fn with_notifications(mut self) -> Self {
self.notification_channel = Some(async_channel::bounded(LQ_CHANNEL_SIZE));
self
}
pub fn with_query_timeout(mut self, duration: Option<Duration>) -> Self {
self.query_timeout = duration;
self
}
pub fn with_transaction_timeout(mut self, duration: Option<Duration>) -> Self {
self.transaction_timeout = duration;
self
}
pub fn with_auth_enabled(mut self, enabled: bool) -> Self {
self.auth_enabled = enabled;
self
}
pub fn with_capabilities(mut self, caps: Capabilities) -> Self {
self.capabilities = caps;
self
}
#[cfg(storage)]
pub fn with_temporary_directory(mut self, path: Option<PathBuf>) -> Self {
self.temporary_directory = path.map(Arc::new);
self
}
pub fn index_store(&self) -> &IndexStores {
&self.index_stores
}
pub fn is_auth_enabled(&self) -> bool {
self.auth_enabled
}
pub fn id(&self) -> Uuid {
self.id
}
pub(crate) fn allows_rpc_method(&self, method_target: &MethodTarget) -> bool {
self.capabilities.allows_rpc_method(method_target)
}
pub fn allows_http_route(&self, route_target: &RouteTarget) -> bool {
self.capabilities.allows_http_route(route_target)
}
#[cfg(feature = "jwks")]
pub(crate) fn allows_network_target(&self, net_target: &NetTarget) -> bool {
self.capabilities.allows_network_target(net_target)
}
pub fn get_capabilities(&self) -> &Capabilities {
&self.capabilities
}
#[cfg(feature = "jwks")]
pub(crate) fn jwks_cache(&self) -> &Arc<RwLock<JwksCache>> {
&self.jwks_cache
}
pub(super) async fn clock_now(&self) -> Timestamp {
self.transaction_factory.clock.now().await
}
#[allow(dead_code)]
pub fn get_cache(&self) -> Arc<DatastoreCache> {
self.cache.clone()
}
#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
pub async fn check_version(&self) -> Result<Version, Error> {
let version = self.get_version().await?;
if !version.is_latest() {
return Err(Error::OutdatedStorageVersion);
}
Ok(version)
}
#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
pub async fn get_version(&self) -> Result<Version, Error> {
let txn = self.transaction(Write, Pessimistic).await?.enclose();
let key = crate::key::version::new();
let val = match catch!(txn, txn.get(key.clone(), None).await) {
Some(v) => {
let val = TryInto::<Version>::try_into(v);
match val {
Err(err) => {
catch!(txn, txn.cancel().await);
return Err(err);
}
Ok(val) => {
catch!(txn, txn.cancel().await);
val
}
}
}
None => {
let rng = crate::key::version::proceeding();
let keys = catch!(txn, txn.keys(rng, 1, None).await);
let val = if keys.is_empty() {
Version::latest()
} else {
Version::v1()
};
let bytes: Vec<u8> = val.into();
catch!(txn, txn.replace(key, bytes).await);
catch!(txn, txn.commit().await);
val
}
};
Ok(val)
}
#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
pub async fn initialise_credentials(&self, user: &str, pass: &str) -> Result<(), Error> {
let txn = self.transaction(Write, Optimistic).await?.enclose();
let users = catch!(txn, txn.all_root_users().await);
if users.is_empty() {
info!(target: TARGET, "Credentials were provided, and no root users were found. The root user '{user}' will be created");
let stm = DefineUserStatement::from((Base::Root, user, pass, INITIAL_USER_ROLE));
let opt = Options::new().with_auth(Arc::new(Auth::for_root(Role::Owner)));
let mut ctx = MutableContext::default();
ctx.set_transaction(txn.clone());
let ctx = ctx.freeze();
catch!(txn, stm.compute(&ctx, &opt, None).await);
txn.commit().await
} else {
warn!(target: TARGET, "Credentials were provided, but existing root users were found. The root user '{user}' will not be created");
warn!(target: TARGET, "Consider removing the --user and --pass arguments from the server start command");
txn.cancel().await
}
}
#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
pub async fn bootstrap(&self) -> Result<(), Error> {
self.insert_node(self.id).await?;
self.expire_nodes().await?;
self.remove_nodes().await?;
Ok(())
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
pub async fn node_membership_update(&self) -> Result<(), Error> {
trace!(target: TARGET, "Updating node registration information");
self.update_node(self.id).await?;
Ok(())
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
pub async fn node_membership_expire(&self) -> Result<(), Error> {
trace!(target: TARGET, "Processing and archiving inactive nodes");
self.expire_nodes().await?;
Ok(())
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
pub async fn node_membership_remove(&self) -> Result<(), Error> {
trace!(target: TARGET, "Processing and cleaning archived nodes");
self.remove_nodes().await?;
Ok(())
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
pub async fn changefeed_process(&self) -> Result<(), Error> {
trace!(target: TARGET, "Running changefeed garbage collection");
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| {
Error::Internal(format!("Clock may have gone backwards: {:?}", e.duration()))
})?
.as_secs();
self.changefeed_versionstamp(ts).await?;
self.changefeed_cleanup(ts).await?;
Ok(())
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
pub async fn changefeed_process_at(&self, ts: u64) -> Result<(), Error> {
trace!(target: TARGET, "Running changefeed garbage collection");
self.changefeed_versionstamp(ts).await?;
self.changefeed_cleanup(ts).await?;
Ok(())
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
pub async fn shutdown(&self) -> Result<(), Error> {
trace!(target: TARGET, "Running datastore shutdown operations");
self.delete_node(self.id).await?;
match self.transaction_factory.flavor.as_ref() {
#[cfg(feature = "kv-mem")]
DatastoreFlavor::Mem(v) => v.shutdown().await,
#[cfg(feature = "kv-rocksdb")]
DatastoreFlavor::RocksDB(v) => v.shutdown().await,
#[cfg(feature = "kv-indxdb")]
DatastoreFlavor::IndxDB(v) => v.shutdown().await,
#[cfg(feature = "kv-tikv")]
DatastoreFlavor::TiKV(v) => v.shutdown().await,
#[cfg(feature = "kv-fdb")]
DatastoreFlavor::FoundationDB(v) => v.shutdown().await,
#[cfg(feature = "kv-surrealkv")]
DatastoreFlavor::SurrealKV(v) => v.shutdown().await,
#[cfg(feature = "kv-surrealcs")]
DatastoreFlavor::SurrealCS(v) => v.shutdown().await,
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
}
#[allow(unreachable_code)]
pub async fn transaction(
&self,
write: TransactionType,
lock: LockType,
) -> Result<Transaction, Error> {
self.transaction_factory.transaction(write, lock).await
}
#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
pub async fn execute(
&self,
txt: &str,
sess: &Session,
vars: Variables,
) -> Result<Vec<Response>, Error> {
let ast = syn::parse_with_capabilities(txt, &self.capabilities)?;
self.process(ast, sess, vars).await
}
#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
pub async fn execute_import<S>(
&self,
sess: &Session,
vars: Variables,
query: S,
) -> Result<Vec<Response>, Error>
where
S: Stream<Item = Result<Bytes, Error>>,
{
if sess.expired() {
return Err(Error::ExpiredSession);
}
self.check_anon(sess).map_err(|_| IamError::NotAllowed {
actor: "anonymous".to_string(),
action: "process".to_string(),
resource: "query".to_string(),
})?;
let opt = self.setup_options(sess);
let mut ctx = self.setup_ctx()?;
sess.context(&mut ctx);
vars.attach(&mut ctx)?;
let parser_settings = ParserSettings {
references_enabled: ctx
.get_capabilities()
.allows_experimental(&ExperimentalTarget::RecordReferences),
bearer_access_enabled: ctx
.get_capabilities()
.allows_experimental(&ExperimentalTarget::BearerAccess),
..Default::default()
};
let mut statements_stream = StatementStream::new_with_settings(parser_settings);
let mut buffer = BytesMut::new();
let mut parse_size = 4096;
let mut bytes_stream = pin!(query);
let mut complete = false;
let mut filling = true;
let stream = futures::stream::poll_fn(move |cx| loop {
while filling {
let bytes = ready!(bytes_stream.as_mut().poll_next(cx));
let bytes = match bytes {
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
Some(Ok(x)) => x,
None => {
complete = true;
filling = false;
break;
}
};
buffer.extend_from_slice(&bytes);
filling = buffer.len() < parse_size
}
if complete {
return match statements_stream.parse_complete(&mut buffer) {
Err(e) => Poll::Ready(Some(Err(Error::InvalidQuery(e)))),
Ok(None) => Poll::Ready(None),
Ok(Some(x)) => Poll::Ready(Some(Ok(x))),
};
}
match statements_stream.parse_partial(&mut buffer) {
Err(e) => return Poll::Ready(Some(Err(Error::InvalidQuery(e)))),
Ok(Some(x)) => return Poll::Ready(Some(Ok(x))),
Ok(None) => {
if buffer.len() >= parse_size && parse_size < u32::MAX as usize {
parse_size = (parse_size + 1).next_power_of_two();
}
filling = true;
}
}
});
Executor::execute_stream(self, Arc::new(ctx), opt, stream).await
}
#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
pub async fn process(
&self,
ast: Query,
sess: &Session,
vars: Variables,
) -> Result<Vec<Response>, Error> {
if sess.expired() {
return Err(Error::ExpiredSession);
}
self.check_anon(sess).map_err(|_| IamError::NotAllowed {
actor: "anonymous".to_string(),
action: "process".to_string(),
resource: "query".to_string(),
})?;
let opt = self.setup_options(sess);
let mut ctx = self.setup_ctx()?;
sess.context(&mut ctx);
vars.attach(&mut ctx)?;
Executor::execute(self, ctx.freeze(), opt, ast).await
}
#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
pub async fn compute(
&self,
val: Value,
sess: &Session,
vars: Variables,
) -> Result<Value, Error> {
if sess.expired() {
return Err(Error::ExpiredSession);
}
self.check_anon(sess).map_err(|_| IamError::NotAllowed {
actor: "anonymous".to_string(),
action: "compute".to_string(),
resource: "value".to_string(),
})?;
let mut stack = TreeStack::new();
let opt = self.setup_options(sess);
let mut ctx = MutableContext::default();
ctx.add_capabilities(self.capabilities.clone());
if let Some(timeout) = self.query_timeout {
ctx.add_timeout(timeout)?;
}
if let Some(channel) = &self.notification_channel {
ctx.add_notifications(Some(&channel.0));
}
sess.context(&mut ctx);
vars.attach(&mut ctx)?;
let txn = self.transaction(val.writeable().into(), Optimistic).await?.enclose();
ctx.set_transaction(txn.clone());
let ctx = ctx.freeze();
let res = stack.enter(|stk| val.compute(stk, &ctx, &opt, None)).finish().await;
match (res.is_ok(), val.writeable()) {
(true, true) => txn.commit().await?,
(_, _) => txn.cancel().await?,
};
res
}
#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
pub async fn evaluate(
&self,
val: &Value,
sess: &Session,
vars: Variables,
) -> Result<Value, Error> {
if sess.expired() {
return Err(Error::ExpiredSession);
}
let mut stack = TreeStack::new();
let opt = self.setup_options(sess);
let mut ctx = MutableContext::default();
ctx.add_capabilities(self.capabilities.clone());
if let Some(timeout) = self.query_timeout {
ctx.add_timeout(timeout)?;
}
if let Some(channel) = &self.notification_channel {
ctx.add_notifications(Some(&channel.0));
}
sess.context(&mut ctx);
vars.attach(&mut ctx)?;
let txn = self.transaction(val.writeable().into(), Optimistic).await?.enclose();
ctx.set_transaction(txn.clone());
let ctx = ctx.freeze();
let res = stack.enter(|stk| val.compute(stk, &ctx, &opt, None)).finish().await;
match (res.is_ok(), val.writeable()) {
(true, true) => txn.commit().await?,
(_, _) => txn.cancel().await?,
};
res
}
#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
pub fn notifications(&self) -> Option<Receiver<Notification>> {
self.notification_channel.as_ref().map(|v| v.1.clone())
}
#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
pub async fn import(&self, sql: &str, sess: &Session) -> Result<Vec<Response>, Error> {
if sess.expired() {
return Err(Error::ExpiredSession);
}
self.execute(sql, sess, None).await
}
#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
pub async fn import_stream<S>(&self, sess: &Session, stream: S) -> Result<Vec<Response>, Error>
where
S: Stream<Item = Result<Bytes, Error>>,
{
if sess.expired() {
return Err(Error::ExpiredSession);
}
self.execute_import(sess, None, stream).await
}
#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
pub async fn export(
&self,
sess: &Session,
chn: Sender<Vec<u8>>,
) -> Result<impl Future<Output = Result<(), Error>>, Error> {
let cfg = super::export::Config::default();
self.export_with_config(sess, chn, cfg).await
}
#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
pub async fn export_with_config(
&self,
sess: &Session,
chn: Sender<Vec<u8>>,
cfg: export::Config,
) -> Result<impl Future<Output = Result<(), Error>>, Error> {
if sess.expired() {
return Err(Error::ExpiredSession);
}
let (ns, db) = crate::iam::check::check_ns_db(sess)?;
let txn = self.transaction(Read, Optimistic).await?;
Ok(async move {
txn.export(&ns, &db, cfg, chn).await?;
Ok(())
})
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self, sess))]
pub fn check(&self, sess: &Session, action: Action, resource: Resource) -> Result<(), Error> {
if sess.expired() {
return Err(Error::ExpiredSession);
}
let skip_auth = !self.is_auth_enabled() && sess.au.is_anon();
if !skip_auth {
sess.au.is_allowed(action, &resource)?;
}
Ok(())
}
pub fn setup_options(&self, sess: &Session) -> Options {
Options::default()
.with_id(self.id)
.with_ns(sess.ns())
.with_db(sess.db())
.with_live(sess.live())
.with_auth(sess.au.clone())
.with_strict(self.strict)
.with_auth_enabled(self.auth_enabled)
}
pub fn setup_ctx(&self) -> Result<MutableContext, Error> {
let mut ctx = MutableContext::from_ds(
self.query_timeout,
self.capabilities.clone(),
self.index_stores.clone(),
self.cache.clone(),
#[cfg(not(target_family = "wasm"))]
self.index_builder.clone(),
#[cfg(storage)]
self.temporary_directory.clone(),
)?;
if let Some(channel) = &self.notification_channel {
ctx.add_notifications(Some(&channel.0));
}
Ok(ctx)
}
pub fn check_anon(&self, sess: &Session) -> Result<(), IamError> {
if self.auth_enabled && sess.au.is_anon() && !self.capabilities.allows_guest_access() {
Err(IamError::NotAllowed {
actor: "anonymous".to_string(),
action: String::new(),
resource: String::new(),
})
} else {
Ok(())
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
pub async fn very_deep_query() -> Result<(), Error> {
use crate::kvs::Datastore;
use crate::sql::{Expression, Future, Number, Operator, Value};
use reblessive::{Stack, Stk};
let mut stack = Stack::new();
async fn build_query(stk: &mut Stk, depth: usize) -> Value {
if depth == 0 {
Value::Expression(Box::new(Expression::Binary {
l: Value::Number(Number::Int(1)),
o: Operator::Add,
r: Value::Number(Number::Int(1)),
}))
} else {
let q = stk.run(|stk| build_query(stk, depth - 1)).await;
Value::Future(Box::new(Future::from(q)))
}
}
let val = stack.enter(|stk| build_query(stk, 1000)).finish();
let dbs = Datastore::new("memory").await.unwrap().with_capabilities(Capabilities::all());
let opt = Options::default()
.with_id(dbs.id)
.with_ns(Some("test".into()))
.with_db(Some("test".into()))
.with_live(false)
.with_strict(false)
.with_auth_enabled(false)
.with_max_computation_depth(u32::MAX)
.with_futures(true);
let mut ctx = MutableContext::default();
ctx.add_capabilities(dbs.capabilities.clone());
let txn = dbs.transaction(val.writeable().into(), Optimistic).await?;
ctx.set_transaction(txn.enclose());
let ctx = ctx.freeze();
let mut stack = reblessive::tree::TreeStack::new();
let res = stack.enter(|stk| val.compute(stk, &ctx, &opt, None)).finish().await.unwrap();
assert_eq!(res, Value::Number(Number::Int(2)));
Ok(())
}
}