1#[cfg(all(not(target_family = "wasm"), surrealdb_unstable))]
2use async_graphql::BatchRequest;
3use std::collections::BTreeMap;
4use std::sync::Arc;
5
6#[cfg(all(not(target_family = "wasm"), surrealdb_unstable))]
7use crate::dbs::capabilities::ExperimentalTarget;
8use crate::err::Error;
9use crate::rpc::Data;
10use crate::rpc::Method;
11use crate::rpc::RpcContext;
12use crate::rpc::RpcError;
13use crate::{
14 dbs::{capabilities::MethodTarget, QueryType, Response},
15 rpc::args::Take,
16 sql::{
17 statements::{
18 CreateStatement, DeleteStatement, InsertStatement, KillStatement, LiveStatement,
19 RelateStatement, SelectStatement, UpdateStatement, UpsertStatement,
20 },
21 Array, Fields, Function, Model, Output, Query, Strand, Value,
22 },
23};
24
25#[allow(async_fn_in_trait)]
26pub trait RpcProtocolV2: RpcContext {
27 async fn execute(&self, method: Method, params: Array) -> Result<Data, RpcError> {
33 if !self.kvs().allows_rpc_method(&MethodTarget {
35 method,
36 }) {
37 warn!("Capabilities denied RPC method call attempt, target: '{method}'");
38 return Err(RpcError::MethodNotAllowed);
39 }
40 match method {
42 Method::Ping => Ok(Value::None.into()),
43 Method::Info => self.info().await,
44 Method::Use => self.yuse(params).await,
45 Method::Signup => self.signup(params).await,
46 Method::Signin => self.signin(params).await,
47 Method::Authenticate => self.authenticate(params).await,
48 Method::Invalidate => self.invalidate().await,
49 Method::Reset => self.reset().await,
50 Method::Kill => self.kill(params).await,
51 Method::Live => self.live(params).await,
52 Method::Set => self.set(params).await,
53 Method::Unset => self.unset(params).await,
54 Method::Select => self.select(params).await,
55 Method::Insert => self.insert(params).await,
56 Method::Create => self.create(params).await,
57 Method::Upsert => self.upsert(params).await,
58 Method::Update => self.update(params).await,
59 Method::Merge => self.merge(params).await,
60 Method::Patch => self.patch(params).await,
61 Method::Delete => self.delete(params).await,
62 Method::Version => self.version(params).await,
63 Method::Query => self.query(params).await,
64 Method::Relate => self.relate(params).await,
65 Method::Run => self.run(params).await,
66 Method::GraphQL => self.graphql(params).await,
67 Method::InsertRelation => self.insert_relation(params).await,
68 Method::Unknown => Err(RpcError::MethodNotFound),
69 }
70 }
71
72 async fn yuse(&self, params: Array) -> Result<Data, RpcError> {
77 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
79 return Err(RpcError::MethodNotAllowed);
80 }
81 let (ns, db) = params.needs_two()?;
85 let mutex = self.lock().clone();
87 let guard = mutex.acquire().await;
89 let mut session = self.session().as_ref().clone();
91 match ns {
93 Value::None => (),
94 Value::Null => session.ns = None,
95 Value::Strand(ns) => session.ns = Some(ns.0),
96 _ => {
97 return Err(RpcError::InvalidParams);
98 }
99 }
100 match db {
102 Value::None => (),
103 Value::Null => session.db = None,
104 Value::Strand(db) => session.db = Some(db.0),
105 _ => {
106 return Err(RpcError::InvalidParams);
107 }
108 }
109 if self.session().ns.is_none() && self.session().db.is_some() {
111 session.db = None;
112 }
113 self.set_session(Arc::new(session));
115 std::mem::drop(guard);
117 Ok(Value::None.into())
119 }
120
121 async fn signup(&self, params: Array) -> Result<Data, RpcError> {
124 let Ok(Value::Object(v)) = params.needs_one() else {
126 return Err(RpcError::InvalidParams);
127 };
128 let mutex = self.lock().clone();
130 let guard = mutex.acquire().await;
132 let mut session = self.session().clone().as_ref().clone();
134 let out: Result<Value, Error> =
136 crate::iam::signup::signup(self.kvs(), &mut session, v).await.map(|v| v.token.into());
137 self.set_session(Arc::new(session));
139 std::mem::drop(guard);
141 out.map(Into::into).map_err(Into::into)
143 }
144
145 async fn signin(&self, params: Array) -> Result<Data, RpcError> {
148 let Ok(Value::Object(v)) = params.needs_one() else {
150 return Err(RpcError::InvalidParams);
151 };
152 let mutex = self.lock().clone();
154 let guard = mutex.acquire().await;
156 let mut session = self.session().clone().as_ref().clone();
158 let out: Result<Value, Error> = crate::iam::signin::signin(self.kvs(), &mut session, v)
160 .await
161 .map(|v| v.token.into());
163 self.set_session(Arc::new(session));
165 std::mem::drop(guard);
167 out.map(Into::into).map_err(Into::into)
169 }
170
171 async fn authenticate(&self, params: Array) -> Result<Data, RpcError> {
172 let Ok(Value::Strand(token)) = params.needs_one() else {
174 return Err(RpcError::InvalidParams);
175 };
176 let mutex = self.lock().clone();
178 let guard = mutex.acquire().await;
180 let mut session = self.session().as_ref().clone();
182 let out: Result<Value, Error> =
184 crate::iam::verify::token(self.kvs(), &mut session, &token.0)
185 .await
186 .map(|_| Value::None);
187 self.set_session(Arc::new(session));
189 std::mem::drop(guard);
191 out.map_err(Into::into).map(Into::into)
193 }
194
195 async fn invalidate(&self) -> Result<Data, RpcError> {
196 let mutex = self.lock().clone();
198 let guard = mutex.acquire().await;
200 let mut session = self.session().as_ref().clone();
202 crate::iam::clear::clear(&mut session)?;
204 self.set_session(Arc::new(session));
206 std::mem::drop(guard);
208 Ok(Value::None.into())
210 }
211
212 async fn reset(&self) -> Result<Data, RpcError> {
213 let mutex = self.lock().clone();
215 let guard = mutex.acquire().await;
217 let mut session = self.session().as_ref().clone();
219 crate::iam::reset::reset(&mut session)?;
221 self.set_session(Arc::new(session));
223 std::mem::drop(guard);
225 self.cleanup_lqs().await;
227 Ok(Value::None.into())
229 }
230
231 async fn info(&self) -> Result<Data, RpcError> {
236 let sql = SelectStatement {
238 expr: Fields::all(),
239 what: vec![Value::Param("auth".into())].into(),
240 ..Default::default()
241 }
242 .into();
243 let mut res = self.kvs().process(sql, &self.session(), None).await?;
245 Ok(res.remove(0).result?.first().into())
247 }
248
249 async fn set(&self, params: Array) -> Result<Data, RpcError> {
254 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
256 return Err(RpcError::MethodNotAllowed);
257 }
258 let Ok((Value::Strand(key), val)) = params.needs_one_or_two() else {
260 return Err(RpcError::InvalidParams);
261 };
262 let var = Some(map! {
264 key.0.clone() => Value::None,
265 });
266 match self.kvs().compute(val, &self.session(), var).await? {
268 Value::None => {
270 let mutex = self.lock().clone();
272 let guard = mutex.acquire().await;
274 let mut session = self.session().as_ref().clone();
276 session.parameters.remove(&key.0);
278 self.set_session(Arc::new(session));
280 std::mem::drop(guard);
282 }
283 v => {
285 let mutex = self.lock().clone();
287 let guard = mutex.acquire().await;
289 let mut session = self.session().as_ref().clone();
291 session.parameters.insert(key.0, v);
293 self.set_session(Arc::new(session));
295 std::mem::drop(guard);
297 }
298 };
299 Ok(Value::Null.into())
301 }
302
303 async fn unset(&self, params: Array) -> Result<Data, RpcError> {
304 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
306 return Err(RpcError::MethodNotAllowed);
307 }
308 let Ok(Value::Strand(key)) = params.needs_one() else {
310 return Err(RpcError::InvalidParams);
311 };
312 let mutex = self.lock().clone();
314 let guard = mutex.acquire().await;
316 let mut session = self.session().as_ref().clone();
318 session.parameters.remove(&key.0);
320 self.set_session(Arc::new(session));
322 std::mem::drop(guard);
324 Ok(Value::Null.into())
326 }
327
328 async fn kill(&self, params: Array) -> Result<Data, RpcError> {
333 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
335 return Err(RpcError::MethodNotAllowed);
336 }
337 let id = params.needs_one()?;
339 let sql = KillStatement {
341 id,
342 }
343 .into();
344 let var = Some(self.session().parameters.clone());
346 let mut res = self.query_inner(Value::Query(sql), var).await?;
348 Ok(res.remove(0).result?.into())
350 }
351
352 async fn live(&self, params: Array) -> Result<Data, RpcError> {
353 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
355 return Err(RpcError::MethodNotAllowed);
356 }
357 let (what, diff) = params.needs_one_or_two()?;
359 let sql = LiveStatement::new_from_what_expr(
361 match diff.is_true() {
362 true => Fields::default(),
363 false => Fields::all(),
364 },
365 what.could_be_table(),
366 )
367 .into();
368 let var = Some(self.session().parameters.clone());
370 let mut res = self.query_inner(Value::Query(sql), var).await?;
372 Ok(res.remove(0).result?.into())
374 }
375
376 async fn select(&self, params: Array) -> Result<Data, RpcError> {
381 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
383 return Err(RpcError::MethodNotAllowed);
384 }
385 let Ok(what) = params.needs_one() else {
387 return Err(RpcError::InvalidParams);
388 };
389 let sql = SelectStatement {
391 only: what.is_thing_single(),
392 expr: Fields::all(),
393 what: vec![what.could_be_table()].into(),
394 ..Default::default()
395 }
396 .into();
397 let var = Some(self.session().parameters.clone());
399 let mut res = self.kvs().process(sql, &self.session(), var).await?;
401 Ok(res
403 .remove(0)
404 .result
405 .or_else(|e| match e {
406 Error::SingleOnlyOutput => Ok(Value::None),
407 e => Err(e),
408 })?
409 .into())
410 }
411
412 async fn insert(&self, params: Array) -> Result<Data, RpcError> {
417 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
419 return Err(RpcError::MethodNotAllowed);
420 }
421 let Ok((what, data)) = params.needs_two() else {
423 return Err(RpcError::InvalidParams);
424 };
425 let sql = InsertStatement {
427 into: match what.is_none_or_null() {
428 false => Some(what.could_be_table()),
429 true => None,
430 },
431 data: crate::sql::Data::SingleExpression(data),
432 output: Some(Output::After),
433 ..Default::default()
434 }
435 .into();
436 let var = Some(self.session().parameters.clone());
438 let mut res = self.kvs().process(sql, &self.session(), var).await?;
440 Ok(res
442 .remove(0)
443 .result
444 .or_else(|e| match e {
445 Error::SingleOnlyOutput => Ok(Value::None),
446 e => Err(e),
447 })?
448 .into())
449 }
450
451 async fn insert_relation(&self, params: Array) -> Result<Data, RpcError> {
452 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
454 return Err(RpcError::MethodNotAllowed);
455 }
456 let Ok((what, data)) = params.needs_two() else {
458 return Err(RpcError::InvalidParams);
459 };
460 let sql = InsertStatement {
462 relation: true,
463 into: match what.is_none_or_null() {
464 false => Some(what.could_be_table()),
465 true => None,
466 },
467 data: crate::sql::Data::SingleExpression(data),
468 output: Some(Output::After),
469 ..Default::default()
470 }
471 .into();
472 let var = Some(self.session().parameters.clone());
474 let mut res = self.kvs().process(sql, &self.session(), var).await?;
476 Ok(res
478 .remove(0)
479 .result
480 .or_else(|e| match e {
481 Error::SingleOnlyOutput => Ok(Value::None),
482 e => Err(e),
483 })?
484 .into())
485 }
486
487 async fn create(&self, params: Array) -> Result<Data, RpcError> {
492 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
494 return Err(RpcError::MethodNotAllowed);
495 }
496 let Ok((what, data)) = params.needs_one_or_two() else {
498 return Err(RpcError::InvalidParams);
499 };
500 let what = what.could_be_table();
501 let sql = CreateStatement {
503 only: what.is_thing_single() || what.is_table(),
504 what: vec![what.could_be_table()].into(),
505 data: match data.is_none_or_null() {
506 false => Some(crate::sql::Data::ContentExpression(data)),
507 true => None,
508 },
509 output: Some(Output::After),
510 ..Default::default()
511 }
512 .into();
513 let mut res = self.kvs().process(sql, &self.session(), None).await?;
515 Ok(res
517 .remove(0)
518 .result
519 .or_else(|e| match e {
520 Error::SingleOnlyOutput => Ok(Value::None),
521 e => Err(e),
522 })?
523 .into())
524 }
525
526 async fn upsert(&self, params: Array) -> Result<Data, RpcError> {
531 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
533 return Err(RpcError::MethodNotAllowed);
534 }
535 let Ok((what, data)) = params.needs_one_or_two() else {
537 return Err(RpcError::InvalidParams);
538 };
539 let sql = UpsertStatement {
541 only: what.is_thing_single(),
542 what: vec![what.could_be_table()].into(),
543 data: match data.is_none_or_null() {
544 false => Some(crate::sql::Data::ContentExpression(data)),
545 true => None,
546 },
547 output: Some(Output::After),
548 ..Default::default()
549 }
550 .into();
551 let var = Some(self.session().parameters.clone());
553 let mut res = self.kvs().process(sql, &self.session(), var).await?;
555 Ok(res
557 .remove(0)
558 .result
559 .or_else(|e| match e {
560 Error::SingleOnlyOutput => Ok(Value::None),
561 e => Err(e),
562 })?
563 .into())
564 }
565
566 async fn update(&self, params: Array) -> Result<Data, RpcError> {
571 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
573 return Err(RpcError::MethodNotAllowed);
574 }
575 let Ok((what, data)) = params.needs_one_or_two() else {
577 return Err(RpcError::InvalidParams);
578 };
579 let sql = UpdateStatement {
581 only: what.is_thing_single(),
582 what: vec![what.could_be_table()].into(),
583 data: match data.is_none_or_null() {
584 false => Some(crate::sql::Data::ContentExpression(data)),
585 true => None,
586 },
587 output: Some(Output::After),
588 ..Default::default()
589 }
590 .into();
591 let var = Some(self.session().parameters.clone());
593 let mut res = self.kvs().process(sql, &self.session(), var).await?;
595 Ok(res
597 .remove(0)
598 .result
599 .or_else(|e| match e {
600 Error::SingleOnlyOutput => Ok(Value::None),
601 e => Err(e),
602 })?
603 .into())
604 }
605
606 async fn merge(&self, params: Array) -> Result<Data, RpcError> {
611 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
613 return Err(RpcError::MethodNotAllowed);
614 }
615 let Ok((what, data)) = params.needs_one_or_two() else {
617 return Err(RpcError::InvalidParams);
618 };
619 let sql = UpdateStatement {
621 only: what.is_thing_single(),
622 what: vec![what.could_be_table()].into(),
623 data: match data.is_none_or_null() {
624 false => Some(crate::sql::Data::MergeExpression(data)),
625 true => None,
626 },
627 output: Some(Output::After),
628 ..Default::default()
629 }
630 .into();
631 let var = Some(self.session().parameters.clone());
633 let mut res = self.kvs().process(sql, &self.session(), var).await?;
635 Ok(res
637 .remove(0)
638 .result
639 .or_else(|e| match e {
640 Error::SingleOnlyOutput => Ok(Value::None),
641 e => Err(e),
642 })?
643 .into())
644 }
645
646 async fn patch(&self, params: Array) -> Result<Data, RpcError> {
651 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
653 return Err(RpcError::MethodNotAllowed);
654 }
655 let Ok((what, data, diff)) = params.needs_one_two_or_three() else {
657 return Err(RpcError::InvalidParams);
658 };
659 let sql = UpdateStatement {
661 only: what.is_thing_single(),
662 what: vec![what.could_be_table()].into(),
663 data: Some(crate::sql::Data::PatchExpression(data)),
664 output: match diff.is_true() {
665 true => Some(Output::Diff),
666 false => Some(Output::After),
667 },
668 ..Default::default()
669 }
670 .into();
671 let var = Some(self.session().parameters.clone());
673 let mut res = self.kvs().process(sql, &self.session(), var).await?;
675 Ok(res
677 .remove(0)
678 .result
679 .or_else(|e| match e {
680 Error::SingleOnlyOutput => Ok(Value::None),
681 e => Err(e),
682 })?
683 .into())
684 }
685
686 async fn relate(&self, params: Array) -> Result<Data, RpcError> {
691 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
693 return Err(RpcError::MethodNotAllowed);
694 }
695 let Ok((from, kind, with, data)) = params.needs_three_or_four() else {
697 return Err(RpcError::InvalidParams);
698 };
699 let sql = RelateStatement {
701 only: from.is_single() && with.is_single(),
702 from,
703 kind: kind.could_be_table(),
704 with,
705 data: match data.is_none_or_null() {
706 false => Some(crate::sql::Data::ContentExpression(data)),
707 true => None,
708 },
709 output: Some(Output::After),
710 ..Default::default()
711 }
712 .into();
713 let var = Some(self.session().parameters.clone());
715 let mut res = self.kvs().process(sql, &self.session(), var).await?;
717 Ok(res
719 .remove(0)
720 .result
721 .or_else(|e| match e {
722 Error::SingleOnlyOutput => Ok(Value::None),
723 e => Err(e),
724 })?
725 .into())
726 }
727
728 async fn delete(&self, params: Array) -> Result<Data, RpcError> {
733 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
735 return Err(RpcError::MethodNotAllowed);
736 }
737 let Ok(what) = params.needs_one() else {
739 return Err(RpcError::InvalidParams);
740 };
741 let sql = DeleteStatement {
743 only: what.is_thing_single(),
744 what: vec![what.could_be_table()].into(),
745 output: Some(Output::Before),
746 ..Default::default()
747 }
748 .into();
749 let var = Some(self.session().parameters.clone());
751 let mut res = self.kvs().process(sql, &self.session(), var).await?;
753 Ok(res
755 .remove(0)
756 .result
757 .or_else(|e| match e {
758 Error::SingleOnlyOutput => Ok(Value::None),
759 e => Err(e),
760 })?
761 .into())
762 }
763
764 async fn version(&self, params: Array) -> Result<Data, RpcError> {
769 match params.len() {
770 0 => Ok(self.version_data()),
771 _ => Err(RpcError::InvalidParams),
772 }
773 }
774
775 async fn query(&self, params: Array) -> Result<Data, RpcError> {
780 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
782 return Err(RpcError::MethodNotAllowed);
783 }
784 let Ok((query, vars)) = params.needs_one_or_two() else {
786 return Err(RpcError::InvalidParams);
787 };
788 if !(query.is_query() || query.is_strand()) {
790 return Err(RpcError::InvalidParams);
791 }
792 let vars = match vars {
794 Value::Object(mut v) => Some(mrg! {v.0, self.session().parameters.clone()}),
795 Value::None | Value::Null => Some(self.session().parameters.clone()),
796 _ => return Err(RpcError::InvalidParams),
797 };
798 self.query_inner(query, vars).await.map(Into::into)
800 }
801
802 async fn run(&self, params: Array) -> Result<Data, RpcError> {
807 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
809 return Err(RpcError::MethodNotAllowed);
810 }
811 let Ok((name, version, args)) = params.needs_one_two_or_three() else {
813 return Err(RpcError::InvalidParams);
814 };
815 let name = match name {
817 Value::Strand(Strand(v)) => v,
818 _ => return Err(RpcError::InvalidParams),
819 };
820 let version = match version {
822 Value::Strand(Strand(v)) => Some(v),
823 Value::None | Value::Null => None,
824 _ => return Err(RpcError::InvalidParams),
825 };
826 let args = match args {
828 Value::Array(Array(arr)) => arr,
829 Value::None | Value::Null => vec![],
830 _ => return Err(RpcError::InvalidParams),
831 };
832 let func: Query = match &name[0..4] {
834 "fn::" => Function::Custom(name.chars().skip(4).collect(), args).into(),
835 "ml::" => Model {
836 name: name.chars().skip(4).collect(),
837 version: version.ok_or(RpcError::InvalidParams)?,
838 args,
839 }
840 .into(),
841 _ => Function::Normal(name, args).into(),
842 };
843 let var = Some(self.session().parameters.clone());
845 let mut res = self.kvs().process(func, &self.session(), var).await?;
847 Ok(res.remove(0).result?.into())
849 }
850
851 #[cfg(any(target_family = "wasm", not(surrealdb_unstable)))]
856 async fn graphql(&self, _: Array) -> Result<Data, RpcError> {
857 Err(RpcError::MethodNotFound)
858 }
859
860 #[cfg(all(not(target_family = "wasm"), surrealdb_unstable))]
861 async fn graphql(&self, params: Array) -> Result<Data, RpcError> {
862 if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
864 return Err(RpcError::MethodNotAllowed);
865 }
866 if !self.kvs().get_capabilities().allows_experimental(&ExperimentalTarget::GraphQL) {
867 return Err(RpcError::BadGQLConfig);
868 }
869
870 use serde::Serialize;
871
872 use crate::gql;
873
874 if !Self::GQL_SUPPORT {
875 return Err(RpcError::BadGQLConfig);
876 }
877
878 let Ok((query, options)) = params.needs_one_or_two() else {
879 return Err(RpcError::InvalidParams);
880 };
881
882 enum GraphQLFormat {
883 Json,
884 }
885
886 let mut pretty = false;
888 let mut format = GraphQLFormat::Json;
890 match options {
892 Value::Object(o) => {
894 for (k, v) in o {
895 match (k.as_str(), v) {
896 ("pretty", Value::Bool(b)) => pretty = b,
897 ("format", Value::Strand(s)) => match s.as_str() {
898 "json" => format = GraphQLFormat::Json,
899 _ => return Err(RpcError::InvalidParams),
900 },
901 _ => return Err(RpcError::InvalidParams),
902 }
903 }
904 }
905 Value::None => (),
907 _ => return Err(RpcError::InvalidParams),
909 }
910 let req = match query {
912 Value::Strand(s) => match format {
914 GraphQLFormat::Json => {
915 let tmp: BatchRequest =
916 serde_json::from_str(s.as_str()).map_err(|_| RpcError::ParseError)?;
917 tmp.into_single().map_err(|_| RpcError::ParseError)?
918 }
919 },
920 Value::Object(mut o) => {
922 let mut tmp = match o.remove("query") {
924 Some(Value::Strand(s)) => async_graphql::Request::new(s),
925 _ => return Err(RpcError::InvalidParams),
926 };
927 match o.remove("variables").or(o.remove("vars")) {
929 Some(obj @ Value::Object(_)) => {
930 let gql_vars = gql::schema::sql_value_to_gql_value(obj)
931 .map_err(|_| RpcError::InvalidRequest)?;
932
933 tmp = tmp.variables(async_graphql::Variables::from_value(gql_vars));
934 }
935 Some(_) => return Err(RpcError::InvalidParams),
936 None => {}
937 }
938 match o.remove("operationName").or(o.remove("operation")) {
940 Some(Value::Strand(s)) => tmp = tmp.operation_name(s),
941 Some(_) => return Err(RpcError::InvalidParams),
942 None => {}
943 }
944 tmp
946 }
947 _ => return Err(RpcError::InvalidParams),
949 };
950 let schema = self
952 .graphql_schema_cache()
953 .get_schema(&self.session())
954 .await
955 .map_err(|e| RpcError::Thrown(e.to_string()))?;
956 let res = schema.execute(req).await;
958 let out = match pretty {
960 true => {
961 let mut buf = Vec::new();
962 let formatter = serde_json::ser::PrettyFormatter::with_indent(b" ");
963 let mut ser = serde_json::Serializer::with_formatter(&mut buf, formatter);
964 res.serialize(&mut ser).ok().and_then(|_| String::from_utf8(buf).ok())
965 }
966 false => serde_json::to_string(&res).ok(),
967 }
968 .ok_or(RpcError::Thrown("Serialization Error".to_string()))?;
969 Ok(Value::Strand(out.into()).into())
971 }
972
973 async fn query_inner(
978 &self,
979 query: Value,
980 vars: Option<BTreeMap<String, Value>>,
981 ) -> Result<Vec<Response>, RpcError> {
982 if !Self::LQ_SUPPORT && self.session().rt {
984 return Err(RpcError::BadLQConfig);
985 }
986 let res = match query {
988 Value::Query(sql) => self.kvs().process(sql, &self.session(), vars).await?,
989 Value::Strand(sql) => self.kvs().execute(&sql, &self.session(), vars).await?,
990 _ => return Err(fail!("Unexpected query type: {query:?}").into()),
991 };
992
993 for response in &res {
995 self.handle_live_query_results(response).await;
997 }
998 Ok(res)
1000 }
1001
1002 async fn handle_live_query_results(&self, res: &Response) {
1003 match &res.query_type {
1004 QueryType::Live => {
1005 if let Ok(Value::Uuid(lqid)) = &res.result {
1006 self.handle_live(&lqid.0).await;
1007 }
1008 }
1009 QueryType::Kill => {
1010 if let Ok(Value::Uuid(lqid)) = &res.result {
1011 self.handle_kill(&lqid.0).await;
1012 }
1013 }
1014 _ => {}
1015 }
1016 }
1017}