surrealdb_core/rpc/protocol/
v2.rs

1#[cfg(all(not(target_family = "wasm"), surrealdb_unstable))]
2use async_graphql::BatchRequest;
3use std::collections::BTreeMap;
4use std::sync::Arc;
5
6#[cfg(all(not(target_family = "wasm"), surrealdb_unstable))]
7use crate::dbs::capabilities::ExperimentalTarget;
8use crate::err::Error;
9use crate::rpc::Data;
10use crate::rpc::Method;
11use crate::rpc::RpcContext;
12use crate::rpc::RpcError;
13use crate::{
14	dbs::{capabilities::MethodTarget, QueryType, Response},
15	rpc::args::Take,
16	sql::{
17		statements::{
18			CreateStatement, DeleteStatement, InsertStatement, KillStatement, LiveStatement,
19			RelateStatement, SelectStatement, UpdateStatement, UpsertStatement,
20		},
21		Array, Fields, Function, Model, Output, Query, Strand, Value,
22	},
23};
24
25#[allow(async_fn_in_trait)]
26pub trait RpcProtocolV2: RpcContext {
27	// ------------------------------
28	// Method execution
29	// ------------------------------
30
31	/// Executes a method on this RPC implementation
32	async fn execute(&self, method: Method, params: Array) -> Result<Data, RpcError> {
33		// Check if capabilities allow executing the requested RPC method
34		if !self.kvs().allows_rpc_method(&MethodTarget {
35			method,
36		}) {
37			warn!("Capabilities denied RPC method call attempt, target: '{method}'");
38			return Err(RpcError::MethodNotAllowed);
39		}
40		// Execute the desired method
41		match method {
42			Method::Ping => Ok(Value::None.into()),
43			Method::Info => self.info().await,
44			Method::Use => self.yuse(params).await,
45			Method::Signup => self.signup(params).await,
46			Method::Signin => self.signin(params).await,
47			Method::Authenticate => self.authenticate(params).await,
48			Method::Invalidate => self.invalidate().await,
49			Method::Reset => self.reset().await,
50			Method::Kill => self.kill(params).await,
51			Method::Live => self.live(params).await,
52			Method::Set => self.set(params).await,
53			Method::Unset => self.unset(params).await,
54			Method::Select => self.select(params).await,
55			Method::Insert => self.insert(params).await,
56			Method::Create => self.create(params).await,
57			Method::Upsert => self.upsert(params).await,
58			Method::Update => self.update(params).await,
59			Method::Merge => self.merge(params).await,
60			Method::Patch => self.patch(params).await,
61			Method::Delete => self.delete(params).await,
62			Method::Version => self.version(params).await,
63			Method::Query => self.query(params).await,
64			Method::Relate => self.relate(params).await,
65			Method::Run => self.run(params).await,
66			Method::GraphQL => self.graphql(params).await,
67			Method::InsertRelation => self.insert_relation(params).await,
68			Method::Unknown => Err(RpcError::MethodNotFound),
69		}
70	}
71
72	// ------------------------------
73	// Methods for authentication
74	// ------------------------------
75
76	async fn yuse(&self, params: Array) -> Result<Data, RpcError> {
77		// Check if the user is allowed to query
78		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
79			return Err(RpcError::MethodNotAllowed);
80		}
81		// For both ns+db, string = change, null = unset, none = do nothing
82		// We need to be able to adjust either ns or db without affecting the other
83		// To be able to select a namespace, and then list resources in that namespace, as an example
84		let (ns, db) = params.needs_two()?;
85		// Get the context lock
86		let mutex = self.lock().clone();
87		// Lock the context for update
88		let guard = mutex.acquire().await;
89		// Clone the current session
90		let mut session = self.session().as_ref().clone();
91		// Update the selected namespace
92		match ns {
93			Value::None => (),
94			Value::Null => session.ns = None,
95			Value::Strand(ns) => session.ns = Some(ns.0),
96			_ => {
97				return Err(RpcError::InvalidParams);
98			}
99		}
100		// Update the selected database
101		match db {
102			Value::None => (),
103			Value::Null => session.db = None,
104			Value::Strand(db) => session.db = Some(db.0),
105			_ => {
106				return Err(RpcError::InvalidParams);
107			}
108		}
109		// Clear any residual database
110		if self.session().ns.is_none() && self.session().db.is_some() {
111			session.db = None;
112		}
113		// Store the updated session
114		self.set_session(Arc::new(session));
115		// Drop the mutex guard
116		std::mem::drop(guard);
117		// Return nothing
118		Ok(Value::None.into())
119	}
120
121	// TODO(gguillemas): Update this method in 3.0.0 to return an object instead of a string.
122	// This will allow returning refresh tokens as well as any additional credential resulting from signing up.
123	async fn signup(&self, params: Array) -> Result<Data, RpcError> {
124		// Process the method arguments
125		let Ok(Value::Object(v)) = params.needs_one() else {
126			return Err(RpcError::InvalidParams);
127		};
128		// Get the context lock
129		let mutex = self.lock().clone();
130		// Lock the context for update
131		let guard = mutex.acquire().await;
132		// Clone the current session
133		let mut session = self.session().clone().as_ref().clone();
134		// Attempt signup, mutating the session
135		let out: Result<Value, Error> =
136			crate::iam::signup::signup(self.kvs(), &mut session, v).await.map(|v| v.token.into());
137		// Store the updated session
138		self.set_session(Arc::new(session));
139		// Drop the mutex guard
140		std::mem::drop(guard);
141		// Return the signup result
142		out.map(Into::into).map_err(Into::into)
143	}
144
145	// TODO(gguillemas): Update this method in 3.0.0 to return an object instead of a string.
146	// This will allow returning refresh tokens as well as any additional credential resulting from signing in.
147	async fn signin(&self, params: Array) -> Result<Data, RpcError> {
148		// Process the method arguments
149		let Ok(Value::Object(v)) = params.needs_one() else {
150			return Err(RpcError::InvalidParams);
151		};
152		// Get the context lock
153		let mutex = self.lock().clone();
154		// Lock the context for update
155		let guard = mutex.acquire().await;
156		// Clone the current session
157		let mut session = self.session().clone().as_ref().clone();
158		// Attempt signin, mutating the session
159		let out: Result<Value, Error> = crate::iam::signin::signin(self.kvs(), &mut session, v)
160			.await
161			// The default `signin` method just returns the token
162			.map(|v| v.token.into());
163		// Store the updated session
164		self.set_session(Arc::new(session));
165		// Drop the mutex guard
166		std::mem::drop(guard);
167		// Return the signin result
168		out.map(Into::into).map_err(Into::into)
169	}
170
171	async fn authenticate(&self, params: Array) -> Result<Data, RpcError> {
172		// Process the method arguments
173		let Ok(Value::Strand(token)) = params.needs_one() else {
174			return Err(RpcError::InvalidParams);
175		};
176		// Get the context lock
177		let mutex = self.lock().clone();
178		// Lock the context for update
179		let guard = mutex.acquire().await;
180		// Clone the current session
181		let mut session = self.session().as_ref().clone();
182		// Attempt authentication, mutating the session
183		let out: Result<Value, Error> =
184			crate::iam::verify::token(self.kvs(), &mut session, &token.0)
185				.await
186				.map(|_| Value::None);
187		// Store the updated session
188		self.set_session(Arc::new(session));
189		// Drop the mutex guard
190		std::mem::drop(guard);
191		// Return nothing on success
192		out.map_err(Into::into).map(Into::into)
193	}
194
195	async fn invalidate(&self) -> Result<Data, RpcError> {
196		// Get the context lock
197		let mutex = self.lock().clone();
198		// Lock the context for update
199		let guard = mutex.acquire().await;
200		// Clone the current session
201		let mut session = self.session().as_ref().clone();
202		// Clear the current session
203		crate::iam::clear::clear(&mut session)?;
204		// Store the updated session
205		self.set_session(Arc::new(session));
206		// Drop the mutex guard
207		std::mem::drop(guard);
208		// Return nothing on success
209		Ok(Value::None.into())
210	}
211
212	async fn reset(&self) -> Result<Data, RpcError> {
213		// Get the context lock
214		let mutex = self.lock().clone();
215		// Lock the context for update
216		let guard = mutex.acquire().await;
217		// Clone the current session
218		let mut session = self.session().as_ref().clone();
219		// Reset the current session
220		crate::iam::reset::reset(&mut session)?;
221		// Store the updated session
222		self.set_session(Arc::new(session));
223		// Drop the mutex guard
224		std::mem::drop(guard);
225		// Cleanup live queries
226		self.cleanup_lqs().await;
227		// Return nothing on success
228		Ok(Value::None.into())
229	}
230
231	// ------------------------------
232	// Methods for identification
233	// ------------------------------
234
235	async fn info(&self) -> Result<Data, RpcError> {
236		// Specify the SQL query string
237		let sql = SelectStatement {
238			expr: Fields::all(),
239			what: vec![Value::Param("auth".into())].into(),
240			..Default::default()
241		}
242		.into();
243		// Execute the query on the database
244		let mut res = self.kvs().process(sql, &self.session(), None).await?;
245		// Extract the first value from the result
246		Ok(res.remove(0).result?.first().into())
247	}
248
249	// ------------------------------
250	// Methods for setting variables
251	// ------------------------------
252
253	async fn set(&self, params: Array) -> Result<Data, RpcError> {
254		// Check if the user is allowed to query
255		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
256			return Err(RpcError::MethodNotAllowed);
257		}
258		// Process the method arguments
259		let Ok((Value::Strand(key), val)) = params.needs_one_or_two() else {
260			return Err(RpcError::InvalidParams);
261		};
262		// Specify the query parameters
263		let var = Some(map! {
264			key.0.clone() => Value::None,
265		});
266		// Compute the specified parameter
267		match self.kvs().compute(val, &self.session(), var).await? {
268			// Remove the variable if undefined
269			Value::None => {
270				// Get the context lock
271				let mutex = self.lock().clone();
272				// Lock the context for update
273				let guard = mutex.acquire().await;
274				// Clone the parameters
275				let mut session = self.session().as_ref().clone();
276				// Remove the set parameter
277				session.parameters.remove(&key.0);
278				// Store the updated session
279				self.set_session(Arc::new(session));
280				// Drop the mutex guard
281				std::mem::drop(guard);
282			}
283			// Store the variable if defined
284			v => {
285				// Get the context lock
286				let mutex = self.lock().clone();
287				// Lock the context for update
288				let guard = mutex.acquire().await;
289				// Clone the parameters
290				let mut session = self.session().as_ref().clone();
291				// Remove the set parameter
292				session.parameters.insert(key.0, v);
293				// Store the updated session
294				self.set_session(Arc::new(session));
295				// Drop the mutex guard
296				std::mem::drop(guard);
297			}
298		};
299		// Return nothing
300		Ok(Value::Null.into())
301	}
302
303	async fn unset(&self, params: Array) -> Result<Data, RpcError> {
304		// Check if the user is allowed to query
305		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
306			return Err(RpcError::MethodNotAllowed);
307		}
308		// Process the method arguments
309		let Ok(Value::Strand(key)) = params.needs_one() else {
310			return Err(RpcError::InvalidParams);
311		};
312		// Get the context lock
313		let mutex = self.lock().clone();
314		// Lock the context for update
315		let guard = mutex.acquire().await;
316		// Clone the parameters
317		let mut session = self.session().as_ref().clone();
318		// Remove the set parameter
319		session.parameters.remove(&key.0);
320		// Store the updated session
321		self.set_session(Arc::new(session));
322		// Drop the mutex guard
323		std::mem::drop(guard);
324		// Return nothing
325		Ok(Value::Null.into())
326	}
327
328	// ------------------------------
329	// Methods for live queries
330	// ------------------------------
331
332	async fn kill(&self, params: Array) -> Result<Data, RpcError> {
333		// Check if the user is allowed to query
334		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
335			return Err(RpcError::MethodNotAllowed);
336		}
337		// Process the method arguments
338		let id = params.needs_one()?;
339		// Specify the SQL query string
340		let sql = KillStatement {
341			id,
342		}
343		.into();
344		// Specify the query parameters
345		let var = Some(self.session().parameters.clone());
346		// Execute the query on the database
347		let mut res = self.query_inner(Value::Query(sql), var).await?;
348		// Extract the first query result
349		Ok(res.remove(0).result?.into())
350	}
351
352	async fn live(&self, params: Array) -> Result<Data, RpcError> {
353		// Check if the user is allowed to query
354		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
355			return Err(RpcError::MethodNotAllowed);
356		}
357		// Process the method arguments
358		let (what, diff) = params.needs_one_or_two()?;
359		// Specify the SQL query string
360		let sql = LiveStatement::new_from_what_expr(
361			match diff.is_true() {
362				true => Fields::default(),
363				false => Fields::all(),
364			},
365			what.could_be_table(),
366		)
367		.into();
368		// Specify the query parameters
369		let var = Some(self.session().parameters.clone());
370		// Execute the query on the database
371		let mut res = self.query_inner(Value::Query(sql), var).await?;
372		// Extract the first query result
373		Ok(res.remove(0).result?.into())
374	}
375
376	// ------------------------------
377	// Methods for selecting
378	// ------------------------------
379
380	async fn select(&self, params: Array) -> Result<Data, RpcError> {
381		// Check if the user is allowed to query
382		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
383			return Err(RpcError::MethodNotAllowed);
384		}
385		// Process the method arguments
386		let Ok(what) = params.needs_one() else {
387			return Err(RpcError::InvalidParams);
388		};
389		// Specify the SQL query string
390		let sql = SelectStatement {
391			only: what.is_thing_single(),
392			expr: Fields::all(),
393			what: vec![what.could_be_table()].into(),
394			..Default::default()
395		}
396		.into();
397		// Specify the query parameters
398		let var = Some(self.session().parameters.clone());
399		// Execute the query on the database
400		let mut res = self.kvs().process(sql, &self.session(), var).await?;
401		// Extract the first query result
402		Ok(res
403			.remove(0)
404			.result
405			.or_else(|e| match e {
406				Error::SingleOnlyOutput => Ok(Value::None),
407				e => Err(e),
408			})?
409			.into())
410	}
411
412	// ------------------------------
413	// Methods for inserting
414	// ------------------------------
415
416	async fn insert(&self, params: Array) -> Result<Data, RpcError> {
417		// Check if the user is allowed to query
418		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
419			return Err(RpcError::MethodNotAllowed);
420		}
421		// Process the method arguments
422		let Ok((what, data)) = params.needs_two() else {
423			return Err(RpcError::InvalidParams);
424		};
425		// Specify the SQL query string
426		let sql = InsertStatement {
427			into: match what.is_none_or_null() {
428				false => Some(what.could_be_table()),
429				true => None,
430			},
431			data: crate::sql::Data::SingleExpression(data),
432			output: Some(Output::After),
433			..Default::default()
434		}
435		.into();
436		// Specify the query parameters
437		let var = Some(self.session().parameters.clone());
438		// Execute the query on the database
439		let mut res = self.kvs().process(sql, &self.session(), var).await?;
440		// Extract the first query result
441		Ok(res
442			.remove(0)
443			.result
444			.or_else(|e| match e {
445				Error::SingleOnlyOutput => Ok(Value::None),
446				e => Err(e),
447			})?
448			.into())
449	}
450
451	async fn insert_relation(&self, params: Array) -> Result<Data, RpcError> {
452		// Check if the user is allowed to query
453		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
454			return Err(RpcError::MethodNotAllowed);
455		}
456		// Process the method arguments
457		let Ok((what, data)) = params.needs_two() else {
458			return Err(RpcError::InvalidParams);
459		};
460		// Specify the SQL query string
461		let sql = InsertStatement {
462			relation: true,
463			into: match what.is_none_or_null() {
464				false => Some(what.could_be_table()),
465				true => None,
466			},
467			data: crate::sql::Data::SingleExpression(data),
468			output: Some(Output::After),
469			..Default::default()
470		}
471		.into();
472		// Specify the query parameters
473		let var = Some(self.session().parameters.clone());
474		// Execute the query on the database
475		let mut res = self.kvs().process(sql, &self.session(), var).await?;
476		// Extract the first query result
477		Ok(res
478			.remove(0)
479			.result
480			.or_else(|e| match e {
481				Error::SingleOnlyOutput => Ok(Value::None),
482				e => Err(e),
483			})?
484			.into())
485	}
486
487	// ------------------------------
488	// Methods for creating
489	// ------------------------------
490
491	async fn create(&self, params: Array) -> Result<Data, RpcError> {
492		// Check if the user is allowed to query
493		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
494			return Err(RpcError::MethodNotAllowed);
495		}
496		// Process the method arguments
497		let Ok((what, data)) = params.needs_one_or_two() else {
498			return Err(RpcError::InvalidParams);
499		};
500		let what = what.could_be_table();
501		// Specify the SQL query string
502		let sql = CreateStatement {
503			only: what.is_thing_single() || what.is_table(),
504			what: vec![what.could_be_table()].into(),
505			data: match data.is_none_or_null() {
506				false => Some(crate::sql::Data::ContentExpression(data)),
507				true => None,
508			},
509			output: Some(Output::After),
510			..Default::default()
511		}
512		.into();
513		// Execute the query on the database
514		let mut res = self.kvs().process(sql, &self.session(), None).await?;
515		// Extract the first query result
516		Ok(res
517			.remove(0)
518			.result
519			.or_else(|e| match e {
520				Error::SingleOnlyOutput => Ok(Value::None),
521				e => Err(e),
522			})?
523			.into())
524	}
525
526	// ------------------------------
527	// Methods for upserting
528	// ------------------------------
529
530	async fn upsert(&self, params: Array) -> Result<Data, RpcError> {
531		// Check if the user is allowed to query
532		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
533			return Err(RpcError::MethodNotAllowed);
534		}
535		// Process the method arguments
536		let Ok((what, data)) = params.needs_one_or_two() else {
537			return Err(RpcError::InvalidParams);
538		};
539		// Specify the SQL query string
540		let sql = UpsertStatement {
541			only: what.is_thing_single(),
542			what: vec![what.could_be_table()].into(),
543			data: match data.is_none_or_null() {
544				false => Some(crate::sql::Data::ContentExpression(data)),
545				true => None,
546			},
547			output: Some(Output::After),
548			..Default::default()
549		}
550		.into();
551		// Specify the query parameters
552		let var = Some(self.session().parameters.clone());
553		// Execute the query on the database
554		let mut res = self.kvs().process(sql, &self.session(), var).await?;
555		// Extract the first query result
556		Ok(res
557			.remove(0)
558			.result
559			.or_else(|e| match e {
560				Error::SingleOnlyOutput => Ok(Value::None),
561				e => Err(e),
562			})?
563			.into())
564	}
565
566	// ------------------------------
567	// Methods for updating
568	// ------------------------------
569
570	async fn update(&self, params: Array) -> Result<Data, RpcError> {
571		// Check if the user is allowed to query
572		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
573			return Err(RpcError::MethodNotAllowed);
574		}
575		// Process the method arguments
576		let Ok((what, data)) = params.needs_one_or_two() else {
577			return Err(RpcError::InvalidParams);
578		};
579		// Specify the SQL query string
580		let sql = UpdateStatement {
581			only: what.is_thing_single(),
582			what: vec![what.could_be_table()].into(),
583			data: match data.is_none_or_null() {
584				false => Some(crate::sql::Data::ContentExpression(data)),
585				true => None,
586			},
587			output: Some(Output::After),
588			..Default::default()
589		}
590		.into();
591		// Specify the query parameters
592		let var = Some(self.session().parameters.clone());
593		// Execute the query on the database
594		let mut res = self.kvs().process(sql, &self.session(), var).await?;
595		// Extract the first query result
596		Ok(res
597			.remove(0)
598			.result
599			.or_else(|e| match e {
600				Error::SingleOnlyOutput => Ok(Value::None),
601				e => Err(e),
602			})?
603			.into())
604	}
605
606	// ------------------------------
607	// Methods for merging
608	// ------------------------------
609
610	async fn merge(&self, params: Array) -> Result<Data, RpcError> {
611		// Check if the user is allowed to query
612		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
613			return Err(RpcError::MethodNotAllowed);
614		}
615		// Process the method arguments
616		let Ok((what, data)) = params.needs_one_or_two() else {
617			return Err(RpcError::InvalidParams);
618		};
619		// Specify the SQL query string
620		let sql = UpdateStatement {
621			only: what.is_thing_single(),
622			what: vec![what.could_be_table()].into(),
623			data: match data.is_none_or_null() {
624				false => Some(crate::sql::Data::MergeExpression(data)),
625				true => None,
626			},
627			output: Some(Output::After),
628			..Default::default()
629		}
630		.into();
631		// Specify the query parameters
632		let var = Some(self.session().parameters.clone());
633		// Execute the query on the database
634		let mut res = self.kvs().process(sql, &self.session(), var).await?;
635		// Extract the first query result
636		Ok(res
637			.remove(0)
638			.result
639			.or_else(|e| match e {
640				Error::SingleOnlyOutput => Ok(Value::None),
641				e => Err(e),
642			})?
643			.into())
644	}
645
646	// ------------------------------
647	// Methods for patching
648	// ------------------------------
649
650	async fn patch(&self, params: Array) -> Result<Data, RpcError> {
651		// Check if the user is allowed to query
652		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
653			return Err(RpcError::MethodNotAllowed);
654		}
655		// Process the method arguments
656		let Ok((what, data, diff)) = params.needs_one_two_or_three() else {
657			return Err(RpcError::InvalidParams);
658		};
659		// Specify the SQL query string
660		let sql = UpdateStatement {
661			only: what.is_thing_single(),
662			what: vec![what.could_be_table()].into(),
663			data: Some(crate::sql::Data::PatchExpression(data)),
664			output: match diff.is_true() {
665				true => Some(Output::Diff),
666				false => Some(Output::After),
667			},
668			..Default::default()
669		}
670		.into();
671		// Specify the query parameters
672		let var = Some(self.session().parameters.clone());
673		// Execute the query on the database
674		let mut res = self.kvs().process(sql, &self.session(), var).await?;
675		// Extract the first query result
676		Ok(res
677			.remove(0)
678			.result
679			.or_else(|e| match e {
680				Error::SingleOnlyOutput => Ok(Value::None),
681				e => Err(e),
682			})?
683			.into())
684	}
685
686	// ------------------------------
687	// Methods for relating
688	// ------------------------------
689
690	async fn relate(&self, params: Array) -> Result<Data, RpcError> {
691		// Check if the user is allowed to query
692		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
693			return Err(RpcError::MethodNotAllowed);
694		}
695		// Process the method arguments
696		let Ok((from, kind, with, data)) = params.needs_three_or_four() else {
697			return Err(RpcError::InvalidParams);
698		};
699		// Specify the SQL query string
700		let sql = RelateStatement {
701			only: from.is_single() && with.is_single(),
702			from,
703			kind: kind.could_be_table(),
704			with,
705			data: match data.is_none_or_null() {
706				false => Some(crate::sql::Data::ContentExpression(data)),
707				true => None,
708			},
709			output: Some(Output::After),
710			..Default::default()
711		}
712		.into();
713		// Specify the query parameters
714		let var = Some(self.session().parameters.clone());
715		// Execute the query on the database
716		let mut res = self.kvs().process(sql, &self.session(), var).await?;
717		// Extract the first query result
718		Ok(res
719			.remove(0)
720			.result
721			.or_else(|e| match e {
722				Error::SingleOnlyOutput => Ok(Value::None),
723				e => Err(e),
724			})?
725			.into())
726	}
727
728	// ------------------------------
729	// Methods for deleting
730	// ------------------------------
731
732	async fn delete(&self, params: Array) -> Result<Data, RpcError> {
733		// Check if the user is allowed to query
734		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
735			return Err(RpcError::MethodNotAllowed);
736		}
737		// Process the method arguments
738		let Ok(what) = params.needs_one() else {
739			return Err(RpcError::InvalidParams);
740		};
741		// Specify the SQL query string
742		let sql = DeleteStatement {
743			only: what.is_thing_single(),
744			what: vec![what.could_be_table()].into(),
745			output: Some(Output::Before),
746			..Default::default()
747		}
748		.into();
749		// Specify the query parameters
750		let var = Some(self.session().parameters.clone());
751		// Execute the query on the database
752		let mut res = self.kvs().process(sql, &self.session(), var).await?;
753		// Extract the first query result
754		Ok(res
755			.remove(0)
756			.result
757			.or_else(|e| match e {
758				Error::SingleOnlyOutput => Ok(Value::None),
759				e => Err(e),
760			})?
761			.into())
762	}
763
764	// ------------------------------
765	// Methods for getting info
766	// ------------------------------
767
768	async fn version(&self, params: Array) -> Result<Data, RpcError> {
769		match params.len() {
770			0 => Ok(self.version_data()),
771			_ => Err(RpcError::InvalidParams),
772		}
773	}
774
775	// ------------------------------
776	// Methods for querying
777	// ------------------------------
778
779	async fn query(&self, params: Array) -> Result<Data, RpcError> {
780		// Check if the user is allowed to query
781		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
782			return Err(RpcError::MethodNotAllowed);
783		}
784		// Process the method arguments
785		let Ok((query, vars)) = params.needs_one_or_two() else {
786			return Err(RpcError::InvalidParams);
787		};
788		// Check the query input type
789		if !(query.is_query() || query.is_strand()) {
790			return Err(RpcError::InvalidParams);
791		}
792		// Specify the query variables
793		let vars = match vars {
794			Value::Object(mut v) => Some(mrg! {v.0, self.session().parameters.clone()}),
795			Value::None | Value::Null => Some(self.session().parameters.clone()),
796			_ => return Err(RpcError::InvalidParams),
797		};
798		// Execute the specified query
799		self.query_inner(query, vars).await.map(Into::into)
800	}
801
802	// ------------------------------
803	// Methods for running functions
804	// ------------------------------
805
806	async fn run(&self, params: Array) -> Result<Data, RpcError> {
807		// Check if the user is allowed to query
808		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
809			return Err(RpcError::MethodNotAllowed);
810		}
811		// Process the method arguments
812		let Ok((name, version, args)) = params.needs_one_two_or_three() else {
813			return Err(RpcError::InvalidParams);
814		};
815		// Parse the function name argument
816		let name = match name {
817			Value::Strand(Strand(v)) => v,
818			_ => return Err(RpcError::InvalidParams),
819		};
820		// Parse any function version argument
821		let version = match version {
822			Value::Strand(Strand(v)) => Some(v),
823			Value::None | Value::Null => None,
824			_ => return Err(RpcError::InvalidParams),
825		};
826		// Parse the function arguments if specified
827		let args = match args {
828			Value::Array(Array(arr)) => arr,
829			Value::None | Value::Null => vec![],
830			_ => return Err(RpcError::InvalidParams),
831		};
832		// Specify the function to run
833		let func: Query = match &name[0..4] {
834			"fn::" => Function::Custom(name.chars().skip(4).collect(), args).into(),
835			"ml::" => Model {
836				name: name.chars().skip(4).collect(),
837				version: version.ok_or(RpcError::InvalidParams)?,
838				args,
839			}
840			.into(),
841			_ => Function::Normal(name, args).into(),
842		};
843		// Specify the query parameters
844		let var = Some(self.session().parameters.clone());
845		// Execute the function on the database
846		let mut res = self.kvs().process(func, &self.session(), var).await?;
847		// Extract the first query result
848		Ok(res.remove(0).result?.into())
849	}
850
851	// ------------------------------
852	// Methods for querying with GraphQL
853	// ------------------------------
854
855	#[cfg(any(target_family = "wasm", not(surrealdb_unstable)))]
856	async fn graphql(&self, _: Array) -> Result<Data, RpcError> {
857		Err(RpcError::MethodNotFound)
858	}
859
860	#[cfg(all(not(target_family = "wasm"), surrealdb_unstable))]
861	async fn graphql(&self, params: Array) -> Result<Data, RpcError> {
862		// Check if the user is allowed to query
863		if !self.kvs().allows_query_by_subject(self.session().au.as_ref()) {
864			return Err(RpcError::MethodNotAllowed);
865		}
866		if !self.kvs().get_capabilities().allows_experimental(&ExperimentalTarget::GraphQL) {
867			return Err(RpcError::BadGQLConfig);
868		}
869
870		use serde::Serialize;
871
872		use crate::gql;
873
874		if !Self::GQL_SUPPORT {
875			return Err(RpcError::BadGQLConfig);
876		}
877
878		let Ok((query, options)) = params.needs_one_or_two() else {
879			return Err(RpcError::InvalidParams);
880		};
881
882		enum GraphQLFormat {
883			Json,
884		}
885
886		// Default to compressed output
887		let mut pretty = false;
888		// Default to graphql json format
889		let mut format = GraphQLFormat::Json;
890		// Process any secondary config options
891		match options {
892			// A config object was passed
893			Value::Object(o) => {
894				for (k, v) in o {
895					match (k.as_str(), v) {
896						("pretty", Value::Bool(b)) => pretty = b,
897						("format", Value::Strand(s)) => match s.as_str() {
898							"json" => format = GraphQLFormat::Json,
899							_ => return Err(RpcError::InvalidParams),
900						},
901						_ => return Err(RpcError::InvalidParams),
902					}
903				}
904			}
905			// The config argument was not supplied
906			Value::None => (),
907			// An invalid config argument was received
908			_ => return Err(RpcError::InvalidParams),
909		}
910		// Process the graphql query argument
911		let req = match query {
912			// It is a string, so parse the query
913			Value::Strand(s) => match format {
914				GraphQLFormat::Json => {
915					let tmp: BatchRequest =
916						serde_json::from_str(s.as_str()).map_err(|_| RpcError::ParseError)?;
917					tmp.into_single().map_err(|_| RpcError::ParseError)?
918				}
919			},
920			// It is an object, so build the query
921			Value::Object(mut o) => {
922				// We expect a `query` key with the graphql query
923				let mut tmp = match o.remove("query") {
924					Some(Value::Strand(s)) => async_graphql::Request::new(s),
925					_ => return Err(RpcError::InvalidParams),
926				};
927				// We can accept a `variables` key with graphql variables
928				match o.remove("variables").or(o.remove("vars")) {
929					Some(obj @ Value::Object(_)) => {
930						let gql_vars = gql::schema::sql_value_to_gql_value(obj)
931							.map_err(|_| RpcError::InvalidRequest)?;
932
933						tmp = tmp.variables(async_graphql::Variables::from_value(gql_vars));
934					}
935					Some(_) => return Err(RpcError::InvalidParams),
936					None => {}
937				}
938				// We can accept an `operation` key with a graphql operation name
939				match o.remove("operationName").or(o.remove("operation")) {
940					Some(Value::Strand(s)) => tmp = tmp.operation_name(s),
941					Some(_) => return Err(RpcError::InvalidParams),
942					None => {}
943				}
944				// Return the graphql query object
945				tmp
946			}
947			// We received an invalid graphql query
948			_ => return Err(RpcError::InvalidParams),
949		};
950		// Process and cache the graphql schema
951		let schema = self
952			.graphql_schema_cache()
953			.get_schema(&self.session())
954			.await
955			.map_err(|e| RpcError::Thrown(e.to_string()))?;
956		// Execute the request against the schema
957		let res = schema.execute(req).await;
958		// Serialize the graphql response
959		let out = match pretty {
960			true => {
961				let mut buf = Vec::new();
962				let formatter = serde_json::ser::PrettyFormatter::with_indent(b"    ");
963				let mut ser = serde_json::Serializer::with_formatter(&mut buf, formatter);
964				res.serialize(&mut ser).ok().and_then(|_| String::from_utf8(buf).ok())
965			}
966			false => serde_json::to_string(&res).ok(),
967		}
968		.ok_or(RpcError::Thrown("Serialization Error".to_string()))?;
969		// Output the graphql response
970		Ok(Value::Strand(out.into()).into())
971	}
972
973	// ------------------------------
974	// Private methods
975	// ------------------------------
976
977	async fn query_inner(
978		&self,
979		query: Value,
980		vars: Option<BTreeMap<String, Value>>,
981	) -> Result<Vec<Response>, RpcError> {
982		// If no live query handler force realtime off
983		if !Self::LQ_SUPPORT && self.session().rt {
984			return Err(RpcError::BadLQConfig);
985		}
986		// Execute the query on the database
987		let res = match query {
988			Value::Query(sql) => self.kvs().process(sql, &self.session(), vars).await?,
989			Value::Strand(sql) => self.kvs().execute(&sql, &self.session(), vars).await?,
990			_ => return Err(fail!("Unexpected query type: {query:?}").into()),
991		};
992
993		// Post-process hooks for web layer
994		for response in &res {
995			// This error should be unreachable because we shouldn't proceed if there's no handler
996			self.handle_live_query_results(response).await;
997		}
998		// Return the result to the client
999		Ok(res)
1000	}
1001
1002	async fn handle_live_query_results(&self, res: &Response) {
1003		match &res.query_type {
1004			QueryType::Live => {
1005				if let Ok(Value::Uuid(lqid)) = &res.result {
1006					self.handle_live(&lqid.0).await;
1007				}
1008			}
1009			QueryType::Kill => {
1010				if let Ok(Value::Uuid(lqid)) = &res.result {
1011					self.handle_kill(&lqid.0).await;
1012				}
1013			}
1014			_ => {}
1015		}
1016	}
1017}