#[cfg(all(not(target_family = "wasm"), surrealdb_unstable))]
use async_graphql::BatchRequest;
use std::collections::BTreeMap;
use std::sync::Arc;
#[cfg(all(not(target_family = "wasm"), surrealdb_unstable))]
use crate::dbs::capabilities::ExperimentalTarget;
use crate::err::Error;
use crate::rpc::Data;
use crate::rpc::Method;
use crate::rpc::RpcContext;
use crate::rpc::RpcError;
use crate::{
dbs::{capabilities::MethodTarget, QueryType, Response},
rpc::args::Take,
sql::{
statements::{
CreateStatement, DeleteStatement, InsertStatement, KillStatement, LiveStatement,
RelateStatement, SelectStatement, UpdateStatement, UpsertStatement,
},
Array, Fields, Function, Model, Output, Query, Strand, Value,
},
};
#[allow(async_fn_in_trait)]
pub trait RpcProtocolV2: RpcContext {
async fn execute(&self, method: Method, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_rpc_method(&MethodTarget {
method,
}) {
warn!("Capabilities denied RPC method call attempt, target: '{method}'");
return Err(RpcError::MethodNotAllowed);
}
match method {
Method::Ping => Ok(Value::None.into()),
Method::Info => self.info().await,
Method::Use => self.yuse(params).await,
Method::Signup => self.signup(params).await,
Method::Signin => self.signin(params).await,
Method::Authenticate => self.authenticate(params).await,
Method::Invalidate => self.invalidate().await,
Method::Reset => self.reset().await,
Method::Kill => self.kill(params).await,
Method::Live => self.live(params).await,
Method::Set => self.set(params).await,
Method::Unset => self.unset(params).await,
Method::Select => self.select(params).await,
Method::Insert => self.insert(params).await,
Method::Create => self.create(params).await,
Method::Upsert => self.upsert(params).await,
Method::Update => self.update(params).await,
Method::Merge => self.merge(params).await,
Method::Patch => self.patch(params).await,
Method::Delete => self.delete(params).await,
Method::Version => self.version(params).await,
Method::Query => self.query(params).await,
Method::Relate => self.relate(params).await,
Method::Run => self.run(params).await,
Method::GraphQL => self.graphql(params).await,
Method::InsertRelation => self.insert_relation(params).await,
Method::Unknown => Err(RpcError::MethodNotFound),
}
}
async fn yuse(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let (ns, db) = params.needs_two()?;
let mutex = self.lock().clone();
let guard = mutex.acquire().await;
let mut session = self.session().as_ref().clone();
match ns {
Value::None => (),
Value::Null => session.ns = None,
Value::Strand(ns) => session.ns = Some(ns.0),
_ => {
return Err(RpcError::InvalidParams);
}
}
match db {
Value::None => (),
Value::Null => session.db = None,
Value::Strand(db) => session.db = Some(db.0),
_ => {
return Err(RpcError::InvalidParams);
}
}
if self.session().ns.is_none() && self.session().db.is_some() {
session.db = None;
}
self.set_session(Arc::new(session));
std::mem::drop(guard);
Ok(Value::None.into())
}
async fn signup(&self, params: Array) -> Result<Data, RpcError> {
let Ok(Value::Object(v)) = params.needs_one() else {
return Err(RpcError::InvalidParams);
};
let mutex = self.lock().clone();
let guard = mutex.acquire().await;
let mut session = self.session().clone().as_ref().clone();
let out: Result<Value, Error> =
crate::iam::signup::signup(self.kvs(), &mut session, v).await.map(|v| v.token.into());
self.set_session(Arc::new(session));
std::mem::drop(guard);
out.map(Into::into).map_err(Into::into)
}
async fn signin(&self, params: Array) -> Result<Data, RpcError> {
let Ok(Value::Object(v)) = params.needs_one() else {
return Err(RpcError::InvalidParams);
};
let mutex = self.lock().clone();
let guard = mutex.acquire().await;
let mut session = self.session().clone().as_ref().clone();
let out: Result<Value, Error> = crate::iam::signin::signin(self.kvs(), &mut session, v)
.await
.map(|v| v.token.into());
self.set_session(Arc::new(session));
std::mem::drop(guard);
out.map(Into::into).map_err(Into::into)
}
async fn authenticate(&self, params: Array) -> Result<Data, RpcError> {
let Ok(Value::Strand(token)) = params.needs_one() else {
return Err(RpcError::InvalidParams);
};
let mutex = self.lock().clone();
let guard = mutex.acquire().await;
let mut session = self.session().as_ref().clone();
let out: Result<Value, Error> =
crate::iam::verify::token(self.kvs(), &mut session, &token.0)
.await
.map(|_| Value::None);
self.set_session(Arc::new(session));
std::mem::drop(guard);
out.map_err(Into::into).map(Into::into)
}
async fn invalidate(&self) -> Result<Data, RpcError> {
let mutex = self.lock().clone();
let guard = mutex.acquire().await;
let mut session = self.session().as_ref().clone();
crate::iam::clear::clear(&mut session)?;
self.set_session(Arc::new(session));
std::mem::drop(guard);
Ok(Value::None.into())
}
async fn reset(&self) -> Result<Data, RpcError> {
let mutex = self.lock().clone();
let guard = mutex.acquire().await;
let mut session = self.session().as_ref().clone();
crate::iam::reset::reset(&mut session)?;
self.set_session(Arc::new(session));
std::mem::drop(guard);
self.cleanup_lqs().await;
Ok(Value::None.into())
}
async fn info(&self) -> Result<Data, RpcError> {
let sql = SelectStatement {
expr: Fields::all(),
what: vec![Value::Param("auth".into())].into(),
..Default::default()
}
.into();
let mut res = self.kvs().process(sql, &self.session(), None).await?;
Ok(res.remove(0).result?.first().into())
}
async fn set(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let Ok((Value::Strand(key), val)) = params.needs_one_or_two() else {
return Err(RpcError::InvalidParams);
};
let var = Some(map! {
key.0.clone() => Value::None,
});
match self.kvs().compute(val, &self.session(), var).await? {
Value::None => {
let mutex = self.lock().clone();
let guard = mutex.acquire().await;
let mut session = self.session().as_ref().clone();
session.parameters.remove(&key.0);
self.set_session(Arc::new(session));
std::mem::drop(guard);
}
v => {
let mutex = self.lock().clone();
let guard = mutex.acquire().await;
let mut session = self.session().as_ref().clone();
session.parameters.insert(key.0, v);
self.set_session(Arc::new(session));
std::mem::drop(guard);
}
};
Ok(Value::Null.into())
}
async fn unset(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let Ok(Value::Strand(key)) = params.needs_one() else {
return Err(RpcError::InvalidParams);
};
let mutex = self.lock().clone();
let guard = mutex.acquire().await;
let mut session = self.session().as_ref().clone();
session.parameters.remove(&key.0);
self.set_session(Arc::new(session));
std::mem::drop(guard);
Ok(Value::Null.into())
}
async fn kill(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let id = params.needs_one()?;
let sql = KillStatement {
id,
}
.into();
let var = Some(self.session().parameters.clone());
let mut res = self.query_inner(Value::Query(sql), var).await?;
Ok(res.remove(0).result?.into())
}
async fn live(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let (what, diff) = params.needs_one_or_two()?;
let sql = LiveStatement::new_from_what_expr(
match diff.is_true() {
true => Fields::default(),
false => Fields::all(),
},
what.could_be_table(),
)
.into();
let var = Some(self.session().parameters.clone());
let mut res = self.query_inner(Value::Query(sql), var).await?;
Ok(res.remove(0).result?.into())
}
async fn select(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let Ok(what) = params.needs_one() else {
return Err(RpcError::InvalidParams);
};
let sql = SelectStatement {
only: what.is_thing_single(),
expr: Fields::all(),
what: vec![what.could_be_table()].into(),
..Default::default()
}
.into();
let var = Some(self.session().parameters.clone());
let mut res = self.kvs().process(sql, &self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn insert(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let Ok((what, data)) = params.needs_two() else {
return Err(RpcError::InvalidParams);
};
let sql = InsertStatement {
into: match what.is_none_or_null() {
false => Some(what.could_be_table()),
true => None,
},
data: crate::sql::Data::SingleExpression(data),
output: Some(Output::After),
..Default::default()
}
.into();
let var = Some(self.session().parameters.clone());
let mut res = self.kvs().process(sql, &self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn insert_relation(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let Ok((what, data)) = params.needs_two() else {
return Err(RpcError::InvalidParams);
};
let sql = InsertStatement {
relation: true,
into: match what.is_none_or_null() {
false => Some(what.could_be_table()),
true => None,
},
data: crate::sql::Data::SingleExpression(data),
output: Some(Output::After),
..Default::default()
}
.into();
let var = Some(self.session().parameters.clone());
let mut res = self.kvs().process(sql, &self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn create(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let Ok((what, data)) = params.needs_one_or_two() else {
return Err(RpcError::InvalidParams);
};
let what = what.could_be_table();
let sql = CreateStatement {
only: what.is_thing_single() || what.is_table(),
what: vec![what.could_be_table()].into(),
data: match data.is_none_or_null() {
false => Some(crate::sql::Data::ContentExpression(data)),
true => None,
},
output: Some(Output::After),
..Default::default()
}
.into();
let mut res = self.kvs().process(sql, &self.session(), None).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn upsert(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let Ok((what, data)) = params.needs_one_or_two() else {
return Err(RpcError::InvalidParams);
};
let sql = UpsertStatement {
only: what.is_thing_single(),
what: vec![what.could_be_table()].into(),
data: match data.is_none_or_null() {
false => Some(crate::sql::Data::ContentExpression(data)),
true => None,
},
output: Some(Output::After),
..Default::default()
}
.into();
let var = Some(self.session().parameters.clone());
let mut res = self.kvs().process(sql, &self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn update(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let Ok((what, data)) = params.needs_one_or_two() else {
return Err(RpcError::InvalidParams);
};
let sql = UpdateStatement {
only: what.is_thing_single(),
what: vec![what.could_be_table()].into(),
data: match data.is_none_or_null() {
false => Some(crate::sql::Data::ContentExpression(data)),
true => None,
},
output: Some(Output::After),
..Default::default()
}
.into();
let var = Some(self.session().parameters.clone());
let mut res = self.kvs().process(sql, &self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn merge(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let Ok((what, data)) = params.needs_one_or_two() else {
return Err(RpcError::InvalidParams);
};
let sql = UpdateStatement {
only: what.is_thing_single(),
what: vec![what.could_be_table()].into(),
data: match data.is_none_or_null() {
false => Some(crate::sql::Data::MergeExpression(data)),
true => None,
},
output: Some(Output::After),
..Default::default()
}
.into();
let var = Some(self.session().parameters.clone());
let mut res = self.kvs().process(sql, &self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn patch(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let Ok((what, data, diff)) = params.needs_one_two_or_three() else {
return Err(RpcError::InvalidParams);
};
let sql = UpdateStatement {
only: what.is_thing_single(),
what: vec![what.could_be_table()].into(),
data: Some(crate::sql::Data::PatchExpression(data)),
output: match diff.is_true() {
true => Some(Output::Diff),
false => Some(Output::After),
},
..Default::default()
}
.into();
let var = Some(self.session().parameters.clone());
let mut res = self.kvs().process(sql, &self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn relate(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let Ok((from, kind, with, data)) = params.needs_three_or_four() else {
return Err(RpcError::InvalidParams);
};
let sql = RelateStatement {
only: from.is_single() && with.is_single(),
from,
kind: kind.could_be_table(),
with,
data: match data.is_none_or_null() {
false => Some(crate::sql::Data::ContentExpression(data)),
true => None,
},
output: Some(Output::After),
..Default::default()
}
.into();
let var = Some(self.session().parameters.clone());
let mut res = self.kvs().process(sql, &self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn delete(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let Ok(what) = params.needs_one() else {
return Err(RpcError::InvalidParams);
};
let sql = DeleteStatement {
only: what.is_thing_single(),
what: vec![what.could_be_table()].into(),
output: Some(Output::Before),
..Default::default()
}
.into();
let var = Some(self.session().parameters.clone());
let mut res = self.kvs().process(sql, &self.session(), var).await?;
Ok(res
.remove(0)
.result
.or_else(|e| match e {
Error::SingleOnlyOutput => Ok(Value::None),
e => Err(e),
})?
.into())
}
async fn version(&self, params: Array) -> Result<Data, RpcError> {
match params.len() {
0 => Ok(self.version_data()),
_ => Err(RpcError::InvalidParams),
}
}
async fn query(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let Ok((query, vars)) = params.needs_one_or_two() else {
return Err(RpcError::InvalidParams);
};
if !(query.is_query() || query.is_strand()) {
return Err(RpcError::InvalidParams);
}
let vars = match vars {
Value::Object(mut v) => Some(mrg! {v.0, self.session().parameters.clone()}),
Value::None | Value::Null => Some(self.session().parameters.clone()),
_ => return Err(RpcError::InvalidParams),
};
self.query_inner(query, vars).await.map(Into::into)
}
async fn run(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
let Ok((name, version, args)) = params.needs_one_two_or_three() else {
return Err(RpcError::InvalidParams);
};
let name = match name {
Value::Strand(Strand(v)) => v,
_ => return Err(RpcError::InvalidParams),
};
let version = match version {
Value::Strand(Strand(v)) => Some(v),
Value::None | Value::Null => None,
_ => return Err(RpcError::InvalidParams),
};
let args = match args {
Value::Array(Array(arr)) => arr,
Value::None | Value::Null => vec![],
_ => return Err(RpcError::InvalidParams),
};
let func: Query = match &name[0..4] {
"fn::" => Function::Custom(name.chars().skip(4).collect(), args).into(),
"ml::" => Model {
name: name.chars().skip(4).collect(),
version: version.ok_or(RpcError::InvalidParams)?,
args,
}
.into(),
_ => Function::Normal(name, args).into(),
};
let var = Some(self.session().parameters.clone());
let mut res = self.kvs().process(func, &self.session(), var).await?;
Ok(res.remove(0).result?.into())
}
#[cfg(any(target_family = "wasm", not(surrealdb_unstable)))]
async fn graphql(&self, _: Array) -> Result<Data, RpcError> {
Err(RpcError::MethodNotFound)
}
#[cfg(all(not(target_family = "wasm"), surrealdb_unstable))]
async fn graphql(&self, params: Array) -> Result<Data, RpcError> {
if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
return Err(RpcError::MethodNotAllowed);
}
if !self.kvs().get_capabilities().allows_experimental(&ExperimentalTarget::GraphQL) {
return Err(RpcError::BadGQLConfig);
}
use serde::Serialize;
use crate::gql;
if !Self::GQL_SUPPORT {
return Err(RpcError::BadGQLConfig);
}
let Ok((query, options)) = params.needs_one_or_two() else {
return Err(RpcError::InvalidParams);
};
enum GraphQLFormat {
Json,
}
let mut pretty = false;
let mut format = GraphQLFormat::Json;
match options {
Value::Object(o) => {
for (k, v) in o {
match (k.as_str(), v) {
("pretty", Value::Bool(b)) => pretty = b,
("format", Value::Strand(s)) => match s.as_str() {
"json" => format = GraphQLFormat::Json,
_ => return Err(RpcError::InvalidParams),
},
_ => return Err(RpcError::InvalidParams),
}
}
}
Value::None => (),
_ => return Err(RpcError::InvalidParams),
}
let req = match query {
Value::Strand(s) => match format {
GraphQLFormat::Json => {
let tmp: BatchRequest =
serde_json::from_str(s.as_str()).map_err(|_| RpcError::ParseError)?;
tmp.into_single().map_err(|_| RpcError::ParseError)?
}
},
Value::Object(mut o) => {
let mut tmp = match o.remove("query") {
Some(Value::Strand(s)) => async_graphql::Request::new(s),
_ => return Err(RpcError::InvalidParams),
};
match o.remove("variables").or(o.remove("vars")) {
Some(obj @ Value::Object(_)) => {
let gql_vars = gql::schema::sql_value_to_gql_value(obj)
.map_err(|_| RpcError::InvalidRequest)?;
tmp = tmp.variables(async_graphql::Variables::from_value(gql_vars));
}
Some(_) => return Err(RpcError::InvalidParams),
None => {}
}
match o.remove("operationName").or(o.remove("operation")) {
Some(Value::Strand(s)) => tmp = tmp.operation_name(s),
Some(_) => return Err(RpcError::InvalidParams),
None => {}
}
tmp
}
_ => return Err(RpcError::InvalidParams),
};
let schema = self
.graphql_schema_cache()
.get_schema(&self.session())
.await
.map_err(|e| RpcError::Thrown(e.to_string()))?;
let res = schema.execute(req).await;
let out = match pretty {
true => {
let mut buf = Vec::new();
let formatter = serde_json::ser::PrettyFormatter::with_indent(b" ");
let mut ser = serde_json::Serializer::with_formatter(&mut buf, formatter);
res.serialize(&mut ser).ok().and_then(|_| String::from_utf8(buf).ok())
}
false => serde_json::to_string(&res).ok(),
}
.ok_or(RpcError::Thrown("Serialization Error".to_string()))?;
Ok(Value::Strand(out.into()).into())
}
async fn query_inner(
&self,
query: Value,
vars: Option<BTreeMap<String, Value>>,
) -> Result<Vec<Response>, RpcError> {
if !Self::LQ_SUPPORT && self.session().rt {
return Err(RpcError::BadLQConfig);
}
let res = match query {
Value::Query(sql) => self.kvs().process(sql, &self.session(), vars).await?,
Value::Strand(sql) => self.kvs().execute(&sql, &self.session(), vars).await?,
_ => return Err(fail!("Unexpected query type: {query:?}").into()),
};
for response in &res {
self.handle_live_query_results(response).await;
}
Ok(res)
}
async fn handle_live_query_results(&self, res: &Response) {
match &res.query_type {
QueryType::Live => {
if let Ok(Value::Uuid(lqid)) = &res.result {
self.handle_live(&lqid.0).await;
}
}
QueryType::Kill => {
if let Ok(Value::Uuid(lqid)) = &res.result {
self.handle_kill(&lqid.0).await;
}
}
_ => {}
}
}
}