surrealdb/api/engine/local/
mod.rs#[cfg(not(target_arch = "wasm32"))]
pub(crate) mod native;
#[cfg(target_arch = "wasm32")]
pub(crate) mod wasm;
use crate::api::conn::DbResponse;
use crate::api::conn::Method;
use crate::api::conn::Param;
use crate::api::engine::create_statement;
use crate::api::engine::delete_statement;
use crate::api::engine::merge_statement;
use crate::api::engine::patch_statement;
use crate::api::engine::select_statement;
use crate::api::engine::update_statement;
#[cfg(not(target_arch = "wasm32"))]
use crate::api::err::Error;
use crate::api::Connect;
use crate::api::Response as QueryResponse;
use crate::api::Result;
use crate::api::Surreal;
use crate::dbs::Notification;
use crate::dbs::Response;
use crate::dbs::Session;
use crate::kvs::Datastore;
use crate::opt::IntoEndpoint;
use crate::sql::statements::KillStatement;
use crate::sql::Array;
use crate::sql::Query;
use crate::sql::Statement;
use crate::sql::Statements;
use crate::sql::Strand;
use crate::sql::Uuid;
use crate::sql::Value;
use channel::Sender;
use indexmap::IndexMap;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::mem;
#[cfg(not(target_arch = "wasm32"))]
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use tokio::fs::OpenOptions;
#[cfg(not(target_arch = "wasm32"))]
use tokio::io;
#[cfg(not(target_arch = "wasm32"))]
use tokio::io::AsyncReadExt;
#[cfg(not(target_arch = "wasm32"))]
use tokio::io::AsyncWriteExt;
const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(10);
#[cfg(feature = "kv-mem")]
#[cfg_attr(docsrs, doc(cfg(feature = "kv-mem")))]
#[derive(Debug)]
pub struct Mem;
#[cfg(feature = "kv-rocksdb")]
#[cfg_attr(docsrs, doc(cfg(feature = "kv-rocksdb")))]
#[derive(Debug)]
pub struct File;
#[cfg(feature = "kv-rocksdb")]
#[cfg_attr(docsrs, doc(cfg(feature = "kv-rocksdb")))]
#[derive(Debug)]
pub struct RocksDb;
#[cfg(feature = "kv-speedb")]
#[cfg_attr(docsrs, doc(cfg(feature = "kv-speedb")))]
#[derive(Debug)]
pub struct SpeeDb;
#[cfg(feature = "kv-indxdb")]
#[cfg_attr(docsrs, doc(cfg(feature = "kv-indxdb")))]
#[derive(Debug)]
pub struct IndxDb;
#[cfg(feature = "kv-tikv")]
#[cfg_attr(docsrs, doc(cfg(feature = "kv-tikv")))]
#[derive(Debug)]
pub struct TiKv;
#[cfg(feature = "kv-fdb")]
#[cfg_attr(docsrs, doc(cfg(feature = "kv-fdb")))]
#[derive(Debug)]
pub struct FDb;
#[derive(Debug, Clone)]
pub struct Db {
pub(crate) method: crate::api::conn::Method,
}
impl Surreal<Db> {
pub fn connect<P>(&self, address: impl IntoEndpoint<P, Client = Db>) -> Connect<Db, ()> {
Connect {
router: self.router.clone(),
address: address.into_endpoint(),
capacity: 0,
client: PhantomData,
response_type: PhantomData,
}
}
}
fn process(responses: Vec<Response>) -> Result<QueryResponse> {
let mut map = IndexMap::with_capacity(responses.len());
for (index, response) in responses.into_iter().enumerate() {
match response.result {
Ok(value) => map.insert(index, Ok(value)),
Err(error) => map.insert(index, Err(error.into())),
};
}
Ok(QueryResponse(map))
}
async fn take(one: bool, responses: Vec<Response>) -> Result<Value> {
if let Some(result) = process(responses)?.0.remove(&0) {
let value = result?;
match one {
true => match value {
Value::Array(Array(mut vec)) => {
if let [value] = &mut vec[..] {
return Ok(mem::take(value));
}
}
Value::None | Value::Null => {}
value => return Ok(value),
},
false => return Ok(value),
}
}
match one {
true => Ok(Value::None),
false => Ok(Value::Array(Array(vec![]))),
}
}
#[cfg(not(target_arch = "wasm32"))]
async fn export(
kvs: &Datastore,
sess: &Session,
ns: String,
db: String,
chn: channel::Sender<Vec<u8>>,
) -> Result<()> {
if let Err(error) = kvs.export(sess, ns, db, chn).await?.await {
if let crate::error::Db::Channel(message) = error {
trace!("{message}");
return Ok(());
}
return Err(error.into());
}
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
async fn copy<'a, R, W>(
path: PathBuf,
reader: &'a mut R,
writer: &'a mut W,
) -> std::result::Result<(), crate::Error>
where
R: tokio::io::AsyncRead + Unpin + ?Sized,
W: tokio::io::AsyncWrite + Unpin + ?Sized,
{
io::copy(reader, writer).await.map(|_| ()).map_err(|error| {
crate::Error::Api(crate::error::Api::FileRead {
path,
error,
})
})
}
async fn kill_live_query(
kvs: &Datastore,
id: Uuid,
session: &Session,
vars: BTreeMap<String, Value>,
) -> Result<Value> {
let query = Query(Statements(vec![Statement::Kill(KillStatement {
id: id.into(),
})]));
let response = kvs.process(query, session, Some(vars)).await?;
take(true, response).await
}
async fn router(
(_, method, param): (i64, Method, Param),
kvs: &Arc<Datastore>,
session: &mut Session,
vars: &mut BTreeMap<String, Value>,
live_queries: &mut HashMap<Uuid, Sender<Notification>>,
) -> Result<DbResponse> {
let mut params = param.other;
match method {
Method::Use => {
match &mut params[..] {
[Value::Strand(Strand(ns)), Value::Strand(Strand(db))] => {
session.ns = Some(mem::take(ns));
session.db = Some(mem::take(db));
}
[Value::Strand(Strand(ns)), Value::None] => {
session.ns = Some(mem::take(ns));
}
[Value::None, Value::Strand(Strand(db))] => {
session.db = Some(mem::take(db));
}
_ => unreachable!(),
}
Ok(DbResponse::Other(Value::None))
}
Method::Signup => {
let credentials = match &mut params[..] {
[Value::Object(credentials)] => mem::take(credentials),
_ => unreachable!(),
};
let response = crate::iam::signup::signup(kvs, session, credentials).await?;
Ok(DbResponse::Other(response.into()))
}
Method::Signin => {
let credentials = match &mut params[..] {
[Value::Object(credentials)] => mem::take(credentials),
_ => unreachable!(),
};
let response = crate::iam::signin::signin(kvs, session, credentials).await?;
Ok(DbResponse::Other(response.into()))
}
Method::Authenticate => {
let token = match &mut params[..] {
[Value::Strand(Strand(token))] => mem::take(token),
_ => unreachable!(),
};
crate::iam::verify::token(kvs, session, &token).await?;
Ok(DbResponse::Other(Value::None))
}
Method::Invalidate => {
crate::iam::clear::clear(session)?;
Ok(DbResponse::Other(Value::None))
}
Method::Create => {
let statement = create_statement(&mut params);
let query = Query(Statements(vec![Statement::Create(statement)]));
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(true, response).await?;
Ok(DbResponse::Other(value))
}
Method::Update => {
let (one, statement) = update_statement(&mut params);
let query = Query(Statements(vec![Statement::Update(statement)]));
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(one, response).await?;
Ok(DbResponse::Other(value))
}
Method::Patch => {
let (one, statement) = patch_statement(&mut params);
let query = Query(Statements(vec![Statement::Update(statement)]));
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(one, response).await?;
Ok(DbResponse::Other(value))
}
Method::Merge => {
let (one, statement) = merge_statement(&mut params);
let query = Query(Statements(vec![Statement::Update(statement)]));
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(one, response).await?;
Ok(DbResponse::Other(value))
}
Method::Select => {
let (one, statement) = select_statement(&mut params);
let query = Query(Statements(vec![Statement::Select(statement)]));
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(one, response).await?;
Ok(DbResponse::Other(value))
}
Method::Delete => {
let (one, statement) = delete_statement(&mut params);
let query = Query(Statements(vec![Statement::Delete(statement)]));
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(one, response).await?;
Ok(DbResponse::Other(value))
}
Method::Query => {
let response = match param.query {
Some((query, mut bindings)) => {
let mut vars = vars.clone();
vars.append(&mut bindings);
kvs.process(query, &*session, Some(vars)).await?
}
None => unreachable!(),
};
let response = process(response)?;
Ok(DbResponse::Query(response))
}
#[cfg(target_arch = "wasm32")]
Method::Export | Method::Import => unreachable!(),
#[cfg(not(target_arch = "wasm32"))]
Method::Export => {
let ns = session.ns.clone().unwrap_or_default();
let db = session.db.clone().unwrap_or_default();
let (tx, rx) = crate::channel::bounded(1);
match (param.file, param.bytes_sender) {
(Some(path), None) => {
let (mut writer, mut reader) = io::duplex(10_240);
let export = export(kvs, session, ns, db, tx);
let bridge = async move {
while let Ok(value) = rx.recv().await {
if writer.write_all(&value).await.is_err() {
break;
}
}
Ok(())
};
let mut output = match OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&path)
.await
{
Ok(path) => path,
Err(error) => {
return Err(Error::FileOpen {
path,
error,
}
.into());
}
};
let copy = copy(path, &mut reader, &mut output);
tokio::try_join!(export, bridge, copy)?;
}
(None, Some(backup)) => {
let kvs = kvs.clone();
let session = session.clone();
tokio::spawn(async move {
let export = async {
if let Err(error) = export(&kvs, &session, ns, db, tx).await {
let _ = backup.send(Err(error)).await;
}
};
let bridge = async {
while let Ok(bytes) = rx.recv().await {
if backup.send(Ok(bytes)).await.is_err() {
break;
}
}
};
tokio::join!(export, bridge);
});
}
_ => unreachable!(),
}
Ok(DbResponse::Other(Value::None))
}
#[cfg(not(target_arch = "wasm32"))]
Method::Import => {
let path = param.file.expect("file to import from");
let mut file = match OpenOptions::new().read(true).open(&path).await {
Ok(path) => path,
Err(error) => {
return Err(Error::FileOpen {
path,
error,
}
.into());
}
};
let mut statements = String::new();
if let Err(error) = file.read_to_string(&mut statements).await {
return Err(Error::FileRead {
path,
error,
}
.into());
}
let responses = kvs.execute(&statements, &*session, Some(vars.clone())).await?;
for response in responses {
response.result?;
}
Ok(DbResponse::Other(Value::None))
}
Method::Health => Ok(DbResponse::Other(Value::None)),
Method::Version => Ok(DbResponse::Other(crate::env::VERSION.into())),
Method::Set => {
let (key, value) = match &mut params[..2] {
[Value::Strand(Strand(key)), value] => (mem::take(key), mem::take(value)),
_ => unreachable!(),
};
match kvs.compute(value, &*session, Some(vars.clone())).await? {
Value::None => vars.remove(&key),
v => vars.insert(key, v),
};
Ok(DbResponse::Other(Value::None))
}
Method::Unset => {
if let [Value::Strand(Strand(key))] = ¶ms[..1] {
vars.remove(key);
}
Ok(DbResponse::Other(Value::None))
}
Method::Live => {
if let Some(sender) = param.notification_sender {
if let [Value::Uuid(id)] = ¶ms[..1] {
live_queries.insert(*id, sender);
}
}
Ok(DbResponse::Other(Value::None))
}
Method::Kill => {
let id = match ¶ms[..] {
[Value::Uuid(id)] => *id,
_ => unreachable!(),
};
live_queries.remove(&id);
let value = kill_live_query(kvs, id, session, vars.clone()).await?;
Ok(DbResponse::Other(value))
}
}
}