#[cfg(not(target_arch = "wasm32"))]
pub(crate) mod native;
#[cfg(target_arch = "wasm32")]
pub(crate) mod wasm;
use crate::api::conn::DbResponse;
use crate::api::conn::Method;
#[cfg(not(target_arch = "wasm32"))]
use crate::api::conn::MlConfig;
use crate::api::conn::Param;
use crate::api::engine::create_statement;
use crate::api::engine::delete_statement;
use crate::api::engine::insert_statement;
use crate::api::engine::merge_statement;
use crate::api::engine::patch_statement;
use crate::api::engine::select_statement;
use crate::api::engine::update_statement;
#[cfg(not(target_arch = "wasm32"))]
use crate::api::err::Error;
use crate::api::Connect;
use crate::api::Response as QueryResponse;
use crate::api::Result;
use crate::api::Surreal;
use crate::dbs::Notification;
use crate::dbs::Response;
use crate::dbs::Session;
#[cfg(feature = "ml")]
#[cfg(not(target_arch = "wasm32"))]
use crate::iam::check::check_ns_db;
#[cfg(feature = "ml")]
#[cfg(not(target_arch = "wasm32"))]
use crate::iam::Action;
#[cfg(feature = "ml")]
#[cfg(not(target_arch = "wasm32"))]
use crate::iam::ResourceKind;
use crate::kvs::Datastore;
#[cfg(feature = "ml")]
#[cfg(not(target_arch = "wasm32"))]
use crate::kvs::{LockType, TransactionType};
use crate::method::Stats;
#[cfg(feature = "ml")]
#[cfg(not(target_arch = "wasm32"))]
use crate::ml::storage::surml_file::SurMlFile;
use crate::opt::IntoEndpoint;
#[cfg(feature = "ml")]
#[cfg(not(target_arch = "wasm32"))]
use crate::sql::statements::DefineModelStatement;
#[cfg(feature = "ml")]
#[cfg(not(target_arch = "wasm32"))]
use crate::sql::statements::DefineStatement;
use crate::sql::statements::KillStatement;
use crate::sql::Query;
use crate::sql::Statement;
use crate::sql::Uuid;
use crate::sql::Value;
use channel::Sender;
#[cfg(feature = "ml")]
#[cfg(not(target_arch = "wasm32"))]
use futures::StreamExt;
use indexmap::IndexMap;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::mem;
#[cfg(not(target_arch = "wasm32"))]
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use tokio::fs::OpenOptions;
#[cfg(not(target_arch = "wasm32"))]
use tokio::io;
#[cfg(not(target_arch = "wasm32"))]
use tokio::io::AsyncReadExt;
#[cfg(not(target_arch = "wasm32"))]
use tokio::io::AsyncWriteExt;
const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(10);
#[cfg(feature = "kv-mem")]
#[cfg_attr(docsrs, doc(cfg(feature = "kv-mem")))]
#[derive(Debug)]
pub struct Mem;
#[cfg(feature = "kv-rocksdb")]
#[cfg_attr(docsrs, doc(cfg(feature = "kv-rocksdb")))]
#[derive(Debug)]
pub struct File;
#[cfg(feature = "kv-rocksdb")]
#[cfg_attr(docsrs, doc(cfg(feature = "kv-rocksdb")))]
#[derive(Debug)]
pub struct RocksDb;
#[cfg(feature = "kv-speedb")]
#[cfg_attr(docsrs, doc(cfg(feature = "kv-speedb")))]
#[derive(Debug)]
pub struct SpeeDb;
#[cfg(feature = "kv-indxdb")]
#[cfg_attr(docsrs, doc(cfg(feature = "kv-indxdb")))]
#[derive(Debug)]
pub struct IndxDb;
#[cfg(feature = "kv-tikv")]
#[cfg_attr(docsrs, doc(cfg(feature = "kv-tikv")))]
#[derive(Debug)]
pub struct TiKv;
#[cfg(feature = "kv-fdb")]
#[cfg_attr(docsrs, doc(cfg(feature = "kv-fdb")))]
#[derive(Debug)]
pub struct FDb;
#[cfg(feature = "kv-surrealkv")]
#[cfg_attr(docsrs, doc(cfg(feature = "kv-surrealkv")))]
#[derive(Debug)]
pub struct SurrealKV;
#[derive(Debug, Clone)]
pub struct Db {
pub(crate) method: crate::api::conn::Method,
}
impl Surreal<Db> {
pub fn connect<P>(&self, address: impl IntoEndpoint<P, Client = Db>) -> Connect<Db, ()> {
Connect {
router: self.router.clone(),
engine: PhantomData,
address: address.into_endpoint(),
capacity: 0,
waiter: self.waiter.clone(),
response_type: PhantomData,
}
}
}
fn process(responses: Vec<Response>) -> QueryResponse {
let mut map = IndexMap::with_capacity(responses.len());
for (index, response) in responses.into_iter().enumerate() {
let stats = Stats {
execution_time: Some(response.time),
};
match response.result {
Ok(value) => map.insert(index, (stats, Ok(value))),
Err(error) => map.insert(index, (stats, Err(error.into()))),
};
}
QueryResponse {
results: map,
..QueryResponse::new()
}
}
async fn take(one: bool, responses: Vec<Response>) -> Result<Value> {
if let Some((_stats, result)) = process(responses).results.swap_remove(&0) {
let value = result?;
match one {
true => match value {
Value::Array(mut array) => {
if let [value] = &mut array.0[..] {
return Ok(mem::take(value));
}
}
Value::None | Value::Null => {}
value => return Ok(value),
},
false => return Ok(value),
}
}
match one {
true => Ok(Value::None),
false => Ok(Value::Array(Default::default())),
}
}
#[cfg(not(target_arch = "wasm32"))]
async fn export(
kvs: &Datastore,
sess: &Session,
chn: channel::Sender<Vec<u8>>,
ml_config: Option<MlConfig>,
) -> Result<()> {
match ml_config {
#[cfg(feature = "ml")]
Some(MlConfig::Export {
name,
version,
}) => {
let (nsv, dbv) = check_ns_db(sess)?;
kvs.check(sess, Action::View, ResourceKind::Model.on_db(&nsv, &dbv))?;
let mut tx = kvs.transaction(TransactionType::Read, LockType::Optimistic).await?;
let info = tx.get_db_model(&nsv, &dbv, &name, &version).await?;
let mut data = crate::obs::stream(info.hash.to_owned()).await?;
while let Some(Ok(bytes)) = data.next().await {
if chn.send(bytes.to_vec()).await.is_err() {
break;
}
}
}
_ => {
if let Err(error) = kvs.export(sess, chn).await?.await {
if let crate::error::Db::Channel(message) = error {
trace!("{message}");
return Ok(());
}
return Err(error.into());
}
}
}
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
async fn copy<'a, R, W>(
path: PathBuf,
reader: &'a mut R,
writer: &'a mut W,
) -> std::result::Result<(), crate::Error>
where
R: tokio::io::AsyncRead + Unpin + ?Sized,
W: tokio::io::AsyncWrite + Unpin + ?Sized,
{
io::copy(reader, writer).await.map(|_| ()).map_err(|error| {
crate::Error::Api(crate::error::Api::FileRead {
path,
error,
})
})
}
async fn kill_live_query(
kvs: &Datastore,
id: Uuid,
session: &Session,
vars: BTreeMap<String, Value>,
) -> Result<Value> {
let mut query = Query::default();
let mut kill = KillStatement::default();
kill.id = id.into();
query.0 .0 = vec![Statement::Kill(kill)];
let response = kvs.process(query, session, Some(vars)).await?;
take(true, response).await
}
async fn router(
(_, method, param): (i64, Method, Param),
kvs: &Arc<Datastore>,
session: &mut Session,
vars: &mut BTreeMap<String, Value>,
live_queries: &mut HashMap<Uuid, Sender<Notification>>,
) -> Result<DbResponse> {
let mut params = param.other;
match method {
Method::Use => {
match &mut params[..] {
[Value::Strand(ns), Value::Strand(db)] => {
session.ns = Some(mem::take(&mut ns.0));
session.db = Some(mem::take(&mut db.0));
}
[Value::Strand(ns), Value::None] => {
session.ns = Some(mem::take(&mut ns.0));
}
[Value::None, Value::Strand(db)] => {
session.db = Some(mem::take(&mut db.0));
}
_ => unreachable!(),
}
Ok(DbResponse::Other(Value::None))
}
Method::Signup => {
let credentials = match &mut params[..] {
[Value::Object(credentials)] => mem::take(credentials),
_ => unreachable!(),
};
let response = crate::iam::signup::signup(kvs, session, credentials).await?;
Ok(DbResponse::Other(response.into()))
}
Method::Signin => {
let credentials = match &mut params[..] {
[Value::Object(credentials)] => mem::take(credentials),
_ => unreachable!(),
};
let response = crate::iam::signin::signin(kvs, session, credentials).await?;
Ok(DbResponse::Other(response.into()))
}
Method::Authenticate => {
let token = match &mut params[..] {
[Value::Strand(token)] => mem::take(&mut token.0),
_ => unreachable!(),
};
crate::iam::verify::token(kvs, session, &token).await?;
Ok(DbResponse::Other(Value::None))
}
Method::Invalidate => {
crate::iam::clear::clear(session)?;
Ok(DbResponse::Other(Value::None))
}
Method::Create => {
let mut query = Query::default();
let statement = create_statement(&mut params);
query.0 .0 = vec![Statement::Create(statement)];
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(true, response).await?;
Ok(DbResponse::Other(value))
}
Method::Update => {
let mut query = Query::default();
let (one, statement) = update_statement(&mut params);
query.0 .0 = vec![Statement::Update(statement)];
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(one, response).await?;
Ok(DbResponse::Other(value))
}
Method::Insert => {
let mut query = Query::default();
let (one, statement) = insert_statement(&mut params);
query.0 .0 = vec![Statement::Insert(statement)];
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(one, response).await?;
Ok(DbResponse::Other(value))
}
Method::Patch => {
let mut query = Query::default();
let (one, statement) = patch_statement(&mut params);
query.0 .0 = vec![Statement::Update(statement)];
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(one, response).await?;
Ok(DbResponse::Other(value))
}
Method::Merge => {
let mut query = Query::default();
let (one, statement) = merge_statement(&mut params);
query.0 .0 = vec![Statement::Update(statement)];
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(one, response).await?;
Ok(DbResponse::Other(value))
}
Method::Select => {
let mut query = Query::default();
let (one, statement) = select_statement(&mut params);
query.0 .0 = vec![Statement::Select(statement)];
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(one, response).await?;
Ok(DbResponse::Other(value))
}
Method::Delete => {
let mut query = Query::default();
let (one, statement) = delete_statement(&mut params);
query.0 .0 = vec![Statement::Delete(statement)];
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(one, response).await?;
Ok(DbResponse::Other(value))
}
Method::Query => {
let response = match param.query {
Some((query, mut bindings)) => {
let mut vars = vars.clone();
vars.append(&mut bindings);
kvs.process(query, &*session, Some(vars)).await?
}
None => unreachable!(),
};
let response = process(response);
Ok(DbResponse::Query(response))
}
#[cfg(target_arch = "wasm32")]
Method::Export | Method::Import => unreachable!(),
#[cfg(not(target_arch = "wasm32"))]
Method::Export => {
let (tx, rx) = crate::channel::bounded(1);
match (param.file, param.bytes_sender) {
(Some(path), None) => {
let (mut writer, mut reader) = io::duplex(10_240);
let export = export(kvs, session, tx, param.ml_config);
let bridge = async move {
while let Ok(value) = rx.recv().await {
if writer.write_all(&value).await.is_err() {
break;
}
}
Ok(())
};
let mut output = match OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&path)
.await
{
Ok(path) => path,
Err(error) => {
return Err(Error::FileOpen {
path,
error,
}
.into());
}
};
let copy = copy(path, &mut reader, &mut output);
tokio::try_join!(export, bridge, copy)?;
}
(None, Some(backup)) => {
let kvs = kvs.clone();
let session = session.clone();
tokio::spawn(async move {
let export = async {
if let Err(error) = export(&kvs, &session, tx, param.ml_config).await {
let _ = backup.send(Err(error)).await;
}
};
let bridge = async {
while let Ok(bytes) = rx.recv().await {
if backup.send(Ok(bytes)).await.is_err() {
break;
}
}
};
tokio::join!(export, bridge);
});
}
_ => unreachable!(),
}
Ok(DbResponse::Other(Value::None))
}
#[cfg(not(target_arch = "wasm32"))]
Method::Import => {
let path = param.file.expect("file to import from");
let mut file = match OpenOptions::new().read(true).open(&path).await {
Ok(path) => path,
Err(error) => {
return Err(Error::FileOpen {
path,
error,
}
.into());
}
};
let responses = match param.ml_config {
#[cfg(feature = "ml")]
Some(MlConfig::Import) => {
let (nsv, dbv) = check_ns_db(session)?;
kvs.check(session, Action::Edit, ResourceKind::Model.on_db(&nsv, &dbv))?;
let mut buffer = Vec::new();
if let Err(error) = file.read_to_end(&mut buffer).await {
return Err(Error::FileRead {
path,
error,
}
.into());
}
let file = match SurMlFile::from_bytes(buffer) {
Ok(file) => file,
Err(error) => {
return Err(Error::FileRead {
path,
error: io::Error::new(
io::ErrorKind::InvalidData,
error.message.to_string(),
),
}
.into());
}
};
let data = file.to_bytes();
let hash = crate::obs::hash(&data);
crate::obs::put(&hash, data).await?;
let mut model = DefineModelStatement::default();
model.name = file.header.name.to_string().into();
model.version = file.header.version.to_string();
model.comment = Some(file.header.description.to_string().into());
model.hash = hash;
let query = DefineStatement::Model(model).into();
kvs.process(query, session, Some(vars.clone())).await?
}
_ => {
let mut statements = String::new();
if let Err(error) = file.read_to_string(&mut statements).await {
return Err(Error::FileRead {
path,
error,
}
.into());
}
kvs.execute(&statements, &*session, Some(vars.clone())).await?
}
};
for response in responses {
response.result?;
}
Ok(DbResponse::Other(Value::None))
}
Method::Health => Ok(DbResponse::Other(Value::None)),
Method::Version => Ok(DbResponse::Other(crate::env::VERSION.into())),
Method::Set => {
let (key, value) = match &mut params[..2] {
[Value::Strand(key), value] => (mem::take(&mut key.0), mem::take(value)),
_ => unreachable!(),
};
let var = Some(crate::map! {
key.clone() => Value::None,
=> vars
});
match kvs.compute(value, &*session, var).await? {
Value::None => vars.remove(&key),
v => vars.insert(key, v),
};
Ok(DbResponse::Other(Value::None))
}
Method::Unset => {
if let [Value::Strand(key)] = ¶ms[..1] {
vars.remove(&key.0);
}
Ok(DbResponse::Other(Value::None))
}
Method::Live => {
if let Some(sender) = param.notification_sender {
if let [Value::Uuid(id)] = ¶ms[..1] {
live_queries.insert(*id, sender);
}
}
Ok(DbResponse::Other(Value::None))
}
Method::Kill => {
let id = match ¶ms[..] {
[Value::Uuid(id)] => *id,
_ => unreachable!(),
};
live_queries.remove(&id);
let value = kill_live_query(kvs, id, session, vars.clone()).await?;
Ok(DbResponse::Other(value))
}
}
}