use crate::err::Error;
use std::{collections::BTreeMap, mem};
#[cfg(all(not(target_family = "wasm"), surrealdb_unstable))]
use async_graphql::BatchRequest;
use uuid::Uuid;
#[cfg(all(not(target_family = "wasm"), surrealdb_unstable))]
use crate::dbs::capabilities::ExperimentalTarget;
#[cfg(all(not(target_family = "wasm"), surrealdb_unstable))]
use crate::gql::SchemaCache;
use crate::{
dbs::{capabilities::MethodTarget, QueryType, Response, Session},
kvs::Datastore,
rpc::args::Take,
sql::{
statements::{
CreateStatement, DeleteStatement, InsertStatement, KillStatement, LiveStatement,
RelateStatement, SelectStatement, UpdateStatement, UpsertStatement,
},
Array, Fields, Function, Model, Output, Query, Strand, Value,
},
};
use super::{method::Method, response::Data, rpc_error::RpcError};
#[allow(async_fn_in_trait)]
pub trait RpcContext {
fn kvs(&self) -> &Datastore;
fn session(&self) -> &Session;
fn session_mut(&mut self) -> &mut Session;
fn vars(&self) -> &BTreeMap<String, Value>;
fn vars_mut(&mut self) -> &mut BTreeMap<String, Value>;
fn version_data(&self) -> Data;
const LQ_SUPPORT: bool = false;
fn handle_live(&self, _lqid: &Uuid) -> impl std::future::Future<Output = ()> + Send {
async { unimplemented!("handle_live function must be implemented if LQ_SUPPORT = true") }
}
fn handle_kill(&self, _lqid: &Uuid) -> impl std::future::Future<Output = ()> + Send {
async { unimplemented!("handle_kill function must be implemented if LQ_SUPPORT = true") }
}
fn cleanup_lqs(&self) -> impl std::future::Future<Output = ()> + Send {
async { unimplemented!("cleanup_lqs function must be implemented if LQ_SUPPORT = true") }
}
#[cfg(all(not(target_family = "wasm"), surrealdb_unstable))]
const GQL_SUPPORT: bool = false;
#[cfg(all(not(target_family = "wasm"), surrealdb_unstable))]
fn graphql_schema_cache(&self) -> &SchemaCache {
unimplemented!("graphql_schema_cache function must be implemented if GQL_SUPPORT = true")
}
async fn execute_mutable(&mut self, method: Method, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_rpc_method(&MethodTarget {
method,
}) {
warn!("Capabilities denied RPC method call attempt, target: '{}'", method.to_str());
return Err(RpcError::MethodNotAllowed);
}
match method {
Method::Ping => Ok(Value::None.into()),
Method::Info => self.info().await,
Method::Use => self.yuse(params).await,
Method::Signup => self.signup(params).await,
Method::Signin => self.signin(params).await,
Method::Authenticate => self.authenticate(params).await,
Method::Invalidate => self.invalidate().await,
Method::Reset => self.reset().await,
Method::Kill => self.kill(params).await,
Method::Live => self.live(params).await,
Method::Set => self.set(params).await,
Method::Unset => self.unset(params).await,
Method::Select => self.select(params).await,
Method::Insert => self.insert(params).await,
Method::Create => self.create(params).await,
Method::Upsert => self.upsert(params).await,
Method::Update => self.update(params).await,
Method::Merge => self.merge(params).await,
Method::Patch => self.patch(params).await,
Method::Delete => self.delete(params).await,
Method::Version => self.version(params).await,
Method::Query => self.query(params).await,
Method::Relate => self.relate(params).await,
Method::Run => self.run(params).await,
Method::GraphQL => self.graphql(params).await,
Method::InsertRelation => self.insert_relation(params).await,
Method::Unknown => Err(RpcError::MethodNotFound),
}
}
async fn execute_immutable(&self, method: Method, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_rpc_method(&MethodTarget {
method,
}) {
warn!("Capabilities denied RPC method call attempt, target: '{}'", method.to_str());
return Err(RpcError::MethodNotAllowed);
}
match method {
Method::Ping => Ok(Value::None.into()),
Method::Info => self.info().await,
Method::Select => self.select(params).await,
Method::Insert => self.insert(params).await,
Method::Create => self.create(params).await,
Method::Upsert => self.upsert(params).await,
Method::Update => self.update(params).await,
Method::Merge => self.merge(params).await,
Method::Patch => self.patch(params).await,
Method::Delete => self.delete(params).await,
Method::Version => self.version(params).await,
Method::Query => self.query(params).await,
Method::Relate => self.relate(params).await,
Method::Run => self.run(params).await,
Method::GraphQL => self.graphql(params).await,
Method::InsertRelation => self.insert_relation(params).await,
Method::Unknown => Err(RpcError::MethodNotFound),
_ => Err(RpcError::MethodNotFound),
}
}
async fn yuse(&mut self, params: Array) -> Result<Data, RpcError> {
let (ns, db) = params.needs_two()?;
match ns {
Value::None => (),
Value::Null => self.session_mut().ns = None,
Value::Strand(ns) => self.session_mut().ns = Some(ns.0),
_ => {
return Err(RpcError::InvalidParams);
}
}
match db {
Value::None => (),
Value::Null => self.session_mut().db = None,
Value::Strand(db) => self.session_mut().db = Some(db.0),
_ => {
return Err(RpcError::InvalidParams);
}
}
if self.session().ns.is_none() && self.session().db.is_some() {
self.session_mut().db = None;
}
Ok(Value::None.into())
}
async fn signup(&mut self, params: Array) -> Result<Data, RpcError> {
let Ok(Value::Object(v)) = params.needs_one() else {
return Err(RpcError::InvalidParams);
};
let mut session = mem::take(self.session_mut());
let out: Result<Value, Error> =
crate::iam::signup::signup(self.kvs(), &mut session, v).await.map(|v| v.token.into());
*self.session_mut() = session;
out.map(Into::into).map_err(Into::into)
}
async fn signin(&mut self, params: Array) -> Result<Data, RpcError> {
let Ok(Value::Object(v)) = params.needs_one() else {
return Err(RpcError::InvalidParams);
};
let mut session = mem::take(self.session_mut());
let out: Result<Value, Error> = crate::iam::signin::signin(self.kvs(), &mut session, v)
.await
.map(|v| v.token.into());
*self.session_mut() = session;
out.map(Into::into).map_err(Into::into)
}
async fn authenticate(&mut self, params: Array) -> Result<Data, RpcError> {
let Ok(Value::Strand(token)) = params.needs_one() else {
return Err(RpcError::InvalidParams);
};
let mut session = mem::take(self.session_mut());
let out: Result<Value, Error> =
crate::iam::verify::token(self.kvs(), &mut session, &token.0)
.await
.map(|_| Value::None);
*self.session_mut() = session;
out.map_err(Into::into).map(Into::into)
}
async fn invalidate(&mut self) -> Result<Data, RpcError> {
crate::iam::clear::clear(self.session_mut())?;
Ok(Value::None.into())
}
async fn reset(&mut self) -> Result<Data, RpcError> {
let mut session = mem::take(self.session_mut());
crate::iam::clear::clear(&mut session)?;
self.session_mut().ns = None;
self.session_mut().db = None;
self.vars_mut().clear();
self.cleanup_lqs().await;
Ok(Value::None.into())
}
async fn info(&self) -> Result<Data, RpcError> {
let sql = SelectStatement {
expr: Fields::all(),
what: vec![Value::Param("auth".into())].into(),
..Default::default()
}
.into();
let mut res = self.kvs().process(sql, self.session(), None).await?;
Ok(res.remove(0).result?.first().into())
}
async fn set(&mut self, params: Array) -> Result<Data, RpcError> {
let Ok((Value::Strand(key), val)) = params.needs_one_or_two() else {
return Err(RpcError::InvalidParams);
};
let var = Some(map! {
key.0.clone() => Value::None,
=> &self.vars()
});
match self.kvs().compute(val, self.session(), var).await? {
Value::None => self.vars_mut().remove(&key.0),
v => self.vars_mut().insert(key.0, v),
};
Ok(Value::Null.into())
}
async fn unset(&mut self, params: Array) -> Result<Data, RpcError> {
let Ok(Value::Strand(key)) = params.needs_one() else {
return Err(RpcError::InvalidParams);
};
self.vars_mut().remove(&key.0);
Ok(Value::Null.into())
}
async fn kill(&mut self, params: Array) -> Result<Data, RpcError> {
let id = params.needs_one()?;
let sql = KillStatement {
id,
}
.into();
let var = Some(self.vars().clone());
let mut res = self.query_inner(Value::Query(sql), var).await?;
Ok(res.remove(0).result?.into())
}
async fn live(&mut self, params: Array) -> Result<Data, RpcError> {
let (what, diff) = params.needs_one_or_two()?;
let sql = LiveStatement::new_from_what_expr(
match diff.is_true() {
true => Fields::default(),
false => Fields::all(),
},
what.could_be_table(),
)
.into();
let var = Some(self.vars().clone());
let mut res = self.query_inner(Value::Query(sql), var).await?;
Ok(res.remove(0).result?.into())
}
async fn select(&self, params: Array) -> Result<Data, RpcError> {
let Ok(what) = params.needs_one() else {
return Err(RpcError::InvalidParams);
};
let sql = SelectStatement {
only: what.is_thing_single(),
expr: Fields::all(),
what: vec![what.could_be_table()].into(),
..Default::default()
}
.into();
let var = Some(self.vars().clone());
let mut res = self.kvs().process(sql, self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn insert(&self, params: Array) -> Result<Data, RpcError> {
let Ok((what, data)) = params.needs_two() else {
return Err(RpcError::InvalidParams);
};
let sql = InsertStatement {
into: match what.is_none_or_null() {
false => Some(what.could_be_table()),
true => None,
},
data: crate::sql::Data::SingleExpression(data),
output: Some(Output::After),
..Default::default()
}
.into();
let var = Some(self.vars().clone());
let mut res = self.kvs().process(sql, self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn insert_relation(&self, params: Array) -> Result<Data, RpcError> {
let Ok((what, data)) = params.needs_two() else {
return Err(RpcError::InvalidParams);
};
let sql = InsertStatement {
relation: true,
into: match what.is_none_or_null() {
false => Some(what.could_be_table()),
true => None,
},
data: crate::sql::Data::SingleExpression(data),
output: Some(Output::After),
..Default::default()
}
.into();
let var = Some(self.vars().clone());
let mut res = self.kvs().process(sql, self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn create(&self, params: Array) -> Result<Data, RpcError> {
let Ok((what, data)) = params.needs_one_or_two() else {
return Err(RpcError::InvalidParams);
};
let what = what.could_be_table();
let sql = CreateStatement {
only: what.is_thing_single() || what.is_table(),
what: vec![what.could_be_table()].into(),
data: match data.is_none_or_null() {
false => Some(crate::sql::Data::ContentExpression(data)),
true => None,
},
output: Some(Output::After),
..Default::default()
}
.into();
let var = Some(self.vars().clone());
let mut res = self.kvs().process(sql, self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn upsert(&self, params: Array) -> Result<Data, RpcError> {
let Ok((what, data)) = params.needs_one_or_two() else {
return Err(RpcError::InvalidParams);
};
let sql = UpsertStatement {
only: what.is_thing_single(),
what: vec![what.could_be_table()].into(),
data: match data.is_none_or_null() {
false => Some(crate::sql::Data::ContentExpression(data)),
true => None,
},
output: Some(Output::After),
..Default::default()
}
.into();
let var = Some(self.vars().clone());
let mut res = self.kvs().process(sql, self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn update(&self, params: Array) -> Result<Data, RpcError> {
let Ok((what, data)) = params.needs_one_or_two() else {
return Err(RpcError::InvalidParams);
};
let sql = UpdateStatement {
only: what.is_thing_single(),
what: vec![what.could_be_table()].into(),
data: match data.is_none_or_null() {
false => Some(crate::sql::Data::ContentExpression(data)),
true => None,
},
output: Some(Output::After),
..Default::default()
}
.into();
let var = Some(self.vars().clone());
let mut res = self.kvs().process(sql, self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn merge(&self, params: Array) -> Result<Data, RpcError> {
let Ok((what, data)) = params.needs_one_or_two() else {
return Err(RpcError::InvalidParams);
};
let sql = UpdateStatement {
only: what.is_thing_single(),
what: vec![what.could_be_table()].into(),
data: match data.is_none_or_null() {
false => Some(crate::sql::Data::MergeExpression(data)),
true => None,
},
output: Some(Output::After),
..Default::default()
}
.into();
let var = Some(self.vars().clone());
let mut res = self.kvs().process(sql, self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn patch(&self, params: Array) -> Result<Data, RpcError> {
let Ok((what, data, diff)) = params.needs_one_two_or_three() else {
return Err(RpcError::InvalidParams);
};
let sql = UpdateStatement {
only: what.is_thing_single(),
what: vec![what.could_be_table()].into(),
data: Some(crate::sql::Data::PatchExpression(data)),
output: match diff.is_true() {
true => Some(Output::Diff),
false => Some(Output::After),
},
..Default::default()
}
.into();
let var = Some(self.vars().clone());
let mut res = self.kvs().process(sql, self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn relate(&self, params: Array) -> Result<Data, RpcError> {
let Ok((from, kind, with, data)) = params.needs_three_or_four() else {
return Err(RpcError::InvalidParams);
};
let sql = RelateStatement {
only: from.is_single() && with.is_single(),
from,
kind: kind.could_be_table(),
with,
data: match data.is_none_or_null() {
false => Some(crate::sql::Data::ContentExpression(data)),
true => None,
},
output: Some(Output::After),
..Default::default()
}
.into();
let var = Some(self.vars().clone());
let mut res = self.kvs().process(sql, self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn delete(&self, params: Array) -> Result<Data, RpcError> {
let Ok(what) = params.needs_one() else {
return Err(RpcError::InvalidParams);
};
let sql = DeleteStatement {
only: what.is_thing_single(),
what: vec![what.could_be_table()].into(),
output: Some(Output::Before),
..Default::default()
}
.into();
let var = Some(self.vars().clone());
let mut res = self.kvs().process(sql, self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn version(&self, params: Array) -> Result<Data, RpcError> {
match params.len() {
0 => Ok(self.version_data()),
_ => Err(RpcError::InvalidParams),
}
}
async fn query(&self, params: Array) -> Result<Data, RpcError> {
let Ok((query, vars)) = params.needs_one_or_two() else {
return Err(RpcError::InvalidParams);
};
if !(query.is_query() || query.is_strand()) {
return Err(RpcError::InvalidParams);
}
let vars = match vars {
Value::Object(mut v) => Some(mrg! {v.0, &self.vars()}),
Value::None | Value::Null => Some(self.vars().clone()),
_ => return Err(RpcError::InvalidParams),
};
self.query_inner(query, vars).await.map(Into::into)
}
async fn run(&self, params: Array) -> Result<Data, RpcError> {
let Ok((name, version, args)) = params.needs_one_two_or_three() else {
return Err(RpcError::InvalidParams);
};
let name = match name {
Value::Strand(Strand(v)) => v,
_ => return Err(RpcError::InvalidParams),
};
let version = match version {
Value::Strand(Strand(v)) => Some(v),
Value::None | Value::Null => None,
_ => return Err(RpcError::InvalidParams),
};
let args = match args {
Value::Array(Array(arr)) => arr,
Value::None | Value::Null => vec![],
_ => return Err(RpcError::InvalidParams),
};
let func: Query = match &name[0..4] {
"fn::" => Function::Custom(name.chars().skip(4).collect(), args).into(),
"ml::" => Model {
name: name.chars().skip(4).collect(),
version: version.ok_or(RpcError::InvalidParams)?,
args,
}
.into(),
_ => Function::Normal(name, args).into(),
};
let vars = Some(self.vars().clone());
let mut res = self.kvs().process(func, self.session(), vars).await?;
Ok(res.remove(0).result?.into())
}
#[cfg(any(target_family = "wasm", not(surrealdb_unstable)))]
async fn graphql(&self, _: Array) -> Result<Data, RpcError> {
Err(RpcError::MethodNotFound)
}
#[cfg(all(not(target_family = "wasm"), surrealdb_unstable))]
async fn graphql(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().get_capabilities().allows_experimental(&ExperimentalTarget::GraphQL) {
return Err(RpcError::BadGQLConfig);
}
use serde::Serialize;
use crate::gql;
if !Self::GQL_SUPPORT {
return Err(RpcError::BadGQLConfig);
}
let Ok((query, options)) = params.needs_one_or_two() else {
return Err(RpcError::InvalidParams);
};
enum GraphQLFormat {
Json,
}
let mut pretty = false;
let mut format = GraphQLFormat::Json;
match options {
Value::Object(o) => {
for (k, v) in o {
match (k.as_str(), v) {
("pretty", Value::Bool(b)) => pretty = b,
("format", Value::Strand(s)) => match s.as_str() {
"json" => format = GraphQLFormat::Json,
_ => return Err(RpcError::InvalidParams),
},
_ => return Err(RpcError::InvalidParams),
}
}
}
Value::None => (),
_ => return Err(RpcError::InvalidParams),
}
let req = match query {
Value::Strand(s) => match format {
GraphQLFormat::Json => {
let tmp: BatchRequest =
serde_json::from_str(s.as_str()).map_err(|_| RpcError::ParseError)?;
tmp.into_single().map_err(|_| RpcError::ParseError)?
}
},
Value::Object(mut o) => {
let mut tmp = match o.remove("query") {
Some(Value::Strand(s)) => async_graphql::Request::new(s),
_ => return Err(RpcError::InvalidParams),
};
match o.remove("variables").or(o.remove("vars")) {
Some(obj @ Value::Object(_)) => {
let gql_vars = gql::schema::sql_value_to_gql_value(obj)
.map_err(|_| RpcError::InvalidRequest)?;
tmp = tmp.variables(async_graphql::Variables::from_value(gql_vars));
}
Some(_) => return Err(RpcError::InvalidParams),
None => {}
}
match o.remove("operationName").or(o.remove("operation")) {
Some(Value::Strand(s)) => tmp = tmp.operation_name(s),
Some(_) => return Err(RpcError::InvalidParams),
None => {}
}
tmp
}
_ => return Err(RpcError::InvalidParams),
};
let schema = self
.graphql_schema_cache()
.get_schema(self.session())
.await
.map_err(|e| RpcError::Thrown(e.to_string()))?;
let res = schema.execute(req).await;
let out = match pretty {
true => {
let mut buf = Vec::new();
let formatter = serde_json::ser::PrettyFormatter::with_indent(b" ");
let mut ser = serde_json::Serializer::with_formatter(&mut buf, formatter);
res.serialize(&mut ser).ok().and_then(|_| String::from_utf8(buf).ok())
}
false => serde_json::to_string(&res).ok(),
}
.ok_or(RpcError::Thrown("Serialization Error".to_string()))?;
Ok(Value::Strand(out.into()).into())
}
async fn query_inner(
&self,
query: Value,
vars: Option<BTreeMap<String, Value>>,
) -> Result<Vec<Response>, RpcError> {
if !Self::LQ_SUPPORT && self.session().rt {
return Err(RpcError::BadLQConfig);
}
let res = match query {
Value::Query(sql) => self.kvs().process(sql, self.session(), vars).await?,
Value::Strand(sql) => self.kvs().execute(&sql, self.session(), vars).await?,
_ => return Err(fail!("Unexpected query type: {query:?}").into()),
};
for response in &res {
self.handle_live_query_results(response).await;
}
Ok(res)
}
async fn handle_live_query_results(&self, res: &Response) {
match &res.query_type {
QueryType::Live => {
if let Ok(Value::Uuid(lqid)) = &res.result {
self.handle_live(&lqid.0).await;
}
}
QueryType::Kill => {
if let Ok(Value::Uuid(lqid)) = &res.result {
self.handle_kill(&lqid.0).await;
}
}
_ => {}
}
}
}