surrealdb_core/kvs/
ds.rs

1use super::export;
2use super::tr::Transactor;
3use super::tx::Transaction;
4use super::version::Version;
5use crate::cf;
6use crate::ctx::MutableContext;
7#[cfg(feature = "jwks")]
8use crate::dbs::capabilities::NetTarget;
9use crate::dbs::capabilities::{
10	ArbitraryQueryTarget, ExperimentalTarget, MethodTarget, RouteTarget,
11};
12use crate::dbs::node::Timestamp;
13use crate::dbs::{
14	Attach, Capabilities, Executor, Notification, Options, Response, Session, Variables,
15};
16use crate::err::Error;
17#[cfg(feature = "jwks")]
18use crate::iam::jwks::JwksCache;
19use crate::iam::{Action, Auth, Error as IamError, Resource, Role};
20use crate::idx::trees::store::IndexStores;
21use crate::kvs::cache::ds::DatastoreCache;
22use crate::kvs::clock::SizedClock;
23#[allow(unused_imports)]
24use crate::kvs::clock::SystemClock;
25#[cfg(not(target_family = "wasm"))]
26use crate::kvs::index::IndexBuilder;
27use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*};
28use crate::sql::{statements::DefineUserStatement, Base, Query, Value};
29use crate::syn;
30use crate::syn::parser::{ParserSettings, StatementStream};
31use async_channel::{Receiver, Sender};
32use bytes::{Bytes, BytesMut};
33use futures::{Future, Stream};
34use reblessive::TreeStack;
35use std::fmt;
36#[cfg(storage)]
37use std::path::PathBuf;
38use std::pin::pin;
39use std::sync::Arc;
40use std::task::{ready, Poll};
41use std::time::Duration;
42#[cfg(not(target_family = "wasm"))]
43use std::time::{SystemTime, UNIX_EPOCH};
44#[cfg(feature = "jwks")]
45use tokio::sync::RwLock;
46use tracing::instrument;
47use tracing::trace;
48use uuid::Uuid;
49#[cfg(target_family = "wasm")]
50use wasmtimer::std::{SystemTime, UNIX_EPOCH};
51
52const TARGET: &str = "surrealdb::core::kvs::ds";
53
54// If there are an infinite number of heartbeats, then we want to go batch-by-batch spread over several checks
55const LQ_CHANNEL_SIZE: usize = 15_000;
56
57// The role assigned to the initial user created when starting the server with credentials for the first time
58const INITIAL_USER_ROLE: &str = "owner";
59
60/// The underlying datastore instance which stores the dataset.
61#[allow(dead_code)]
62#[non_exhaustive]
63pub struct Datastore {
64	transaction_factory: TransactionFactory,
65	/// The unique id of this datastore, used in notifications.
66	id: Uuid,
67	/// Whether this datastore runs in strict mode by default.
68	strict: bool,
69	/// Whether authentication is enabled on this datastore.
70	auth_enabled: bool,
71	/// The maximum duration timeout for running multiple statements in a query.
72	query_timeout: Option<Duration>,
73	/// The maximum duration timeout for running multiple statements in a transaction.
74	transaction_timeout: Option<Duration>,
75	/// The security and feature capabilities for this datastore.
76	capabilities: Arc<Capabilities>,
77	// Whether this datastore enables live query notifications to subscribers.
78	notification_channel: Option<(Sender<Notification>, Receiver<Notification>)>,
79	// The index store cache
80	index_stores: IndexStores,
81	// The cross transaction cache
82	cache: Arc<DatastoreCache>,
83	// The index asynchronous builder
84	#[cfg(not(target_family = "wasm"))]
85	index_builder: IndexBuilder,
86	#[cfg(feature = "jwks")]
87	// The JWKS object cache
88	jwks_cache: Arc<RwLock<JwksCache>>,
89	#[cfg(storage)]
90	// The temporary directory
91	temporary_directory: Option<Arc<PathBuf>>,
92}
93
94#[derive(Clone)]
95pub(super) struct TransactionFactory {
96	// Clock for tracking time. It is read only and accessible to all transactions. It is behind a mutex as tests may write to it.
97	clock: Arc<SizedClock>,
98	// The inner datastore type
99	flavor: Arc<DatastoreFlavor>,
100}
101
102impl TransactionFactory {
103	#[allow(unreachable_code)]
104	pub async fn transaction(
105		&self,
106		write: TransactionType,
107		lock: LockType,
108	) -> Result<Transaction, Error> {
109		// Specify if the transaction is writeable
110		#[allow(unused_variables)]
111		let write = match write {
112			Read => false,
113			Write => true,
114		};
115		// Specify if the transaction is lockable
116		#[allow(unused_variables)]
117		let lock = match lock {
118			Pessimistic => true,
119			Optimistic => false,
120		};
121		// Create a new transaction on the datastore
122		#[allow(unused_variables)]
123		let (inner, local) = match self.flavor.as_ref() {
124			#[cfg(feature = "kv-mem")]
125			DatastoreFlavor::Mem(v) => {
126				let tx = v.transaction(write, lock).await?;
127				(super::tr::Inner::Mem(tx), true)
128			}
129			#[cfg(feature = "kv-rocksdb")]
130			DatastoreFlavor::RocksDB(v) => {
131				let tx = v.transaction(write, lock).await?;
132				(super::tr::Inner::RocksDB(tx), true)
133			}
134			#[cfg(feature = "kv-indxdb")]
135			DatastoreFlavor::IndxDB(v) => {
136				let tx = v.transaction(write, lock).await?;
137				(super::tr::Inner::IndxDB(tx), true)
138			}
139			#[cfg(feature = "kv-tikv")]
140			DatastoreFlavor::TiKV(v) => {
141				let tx = v.transaction(write, lock).await?;
142				(super::tr::Inner::TiKV(tx), false)
143			}
144			#[cfg(feature = "kv-fdb")]
145			DatastoreFlavor::FoundationDB(v) => {
146				let tx = v.transaction(write, lock).await?;
147				(super::tr::Inner::FoundationDB(tx), false)
148			}
149			#[cfg(feature = "kv-surrealkv")]
150			DatastoreFlavor::SurrealKV(v) => {
151				let tx = v.transaction(write, lock).await?;
152				(super::tr::Inner::SurrealKV(tx), true)
153			}
154			#[cfg(feature = "kv-surrealcs")]
155			DatastoreFlavor::SurrealCS(v) => {
156				let tx = v.transaction(write, lock).await?;
157				(super::tr::Inner::SurrealCS(tx), false)
158			}
159			#[allow(unreachable_patterns)]
160			_ => unreachable!(),
161		};
162		Ok(Transaction::new(
163			local,
164			Transactor {
165				inner,
166				stash: super::stash::Stash::default(),
167				cf: cf::Writer::new(),
168				clock: self.clock.clone(),
169			},
170		))
171	}
172}
173
174#[allow(clippy::large_enum_variant)]
175pub(super) enum DatastoreFlavor {
176	#[cfg(feature = "kv-mem")]
177	Mem(super::mem::Datastore),
178	#[cfg(feature = "kv-rocksdb")]
179	RocksDB(super::rocksdb::Datastore),
180	#[cfg(feature = "kv-indxdb")]
181	IndxDB(super::indxdb::Datastore),
182	#[cfg(feature = "kv-tikv")]
183	TiKV(super::tikv::Datastore),
184	#[cfg(feature = "kv-fdb")]
185	FoundationDB(super::fdb::Datastore),
186	#[cfg(feature = "kv-surrealkv")]
187	SurrealKV(super::surrealkv::Datastore),
188	#[cfg(feature = "kv-surrealcs")]
189	SurrealCS(super::surrealcs::Datastore),
190}
191
192impl fmt::Display for Datastore {
193	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194		#![allow(unused_variables)]
195		match self.transaction_factory.flavor.as_ref() {
196			#[cfg(feature = "kv-mem")]
197			DatastoreFlavor::Mem(_) => write!(f, "memory"),
198			#[cfg(feature = "kv-rocksdb")]
199			DatastoreFlavor::RocksDB(_) => write!(f, "rocksdb"),
200			#[cfg(feature = "kv-indxdb")]
201			DatastoreFlavor::IndxDB(_) => write!(f, "indxdb"),
202			#[cfg(feature = "kv-tikv")]
203			DatastoreFlavor::TiKV(_) => write!(f, "tikv"),
204			#[cfg(feature = "kv-fdb")]
205			DatastoreFlavor::FoundationDB(_) => write!(f, "fdb"),
206			#[cfg(feature = "kv-surrealkv")]
207			DatastoreFlavor::SurrealKV(_) => write!(f, "surrealkv"),
208			#[cfg(feature = "kv-surrealcs")]
209			DatastoreFlavor::SurrealCS(_) => write!(f, "surrealcs"),
210			#[allow(unreachable_patterns)]
211			_ => unreachable!(),
212		}
213	}
214}
215
216impl Datastore {
217	/// Creates a new datastore instance
218	///
219	/// # Examples
220	///
221	/// ```rust,no_run
222	/// # use surrealdb_core::kvs::Datastore;
223	/// # use surrealdb_core::err::Error;
224	/// # #[tokio::main]
225	/// # async fn main() -> Result<(), Error> {
226	/// let ds = Datastore::new("memory").await?;
227	/// # Ok(())
228	/// # }
229	/// ```
230	///
231	/// Or to create a file-backed store:
232	///
233	/// ```rust,no_run
234	/// # use surrealdb_core::kvs::Datastore;
235	/// # use surrealdb_core::err::Error;
236	/// # #[tokio::main]
237	/// # async fn main() -> Result<(), Error> {
238	/// let ds = Datastore::new("surrealkv://temp.skv").await?;
239	/// # Ok(())
240	/// # }
241	/// ```
242	///
243	/// Or to connect to a tikv-backed distributed store:
244	///
245	/// ```rust,no_run
246	/// # use surrealdb_core::kvs::Datastore;
247	/// # use surrealdb_core::err::Error;
248	/// # #[tokio::main]
249	/// # async fn main() -> Result<(), Error> {
250	/// let ds = Datastore::new("tikv://127.0.0.1:2379").await?;
251	/// # Ok(())
252	/// # }
253	/// ```
254	pub async fn new(path: &str) -> Result<Self, Error> {
255		Self::new_with_clock(path, None).await
256	}
257
258	#[allow(unused_variables)]
259	pub async fn new_with_clock(
260		path: &str,
261		clock: Option<Arc<SizedClock>>,
262	) -> Result<Datastore, Error> {
263		// Initiate the desired datastore
264		let (flavor, clock): (Result<DatastoreFlavor, Error>, Arc<SizedClock>) = match path {
265			// Initiate an in-memory datastore
266			"memory" => {
267				#[cfg(feature = "kv-mem")]
268				{
269					// Innitialise the storage engine
270					info!(target: TARGET, "Starting kvs store in {}", path);
271					let v = super::mem::Datastore::new().await.map(DatastoreFlavor::Mem);
272					let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
273					info!(target: TARGET, "Started kvs store in {}", path);
274					Ok((v, c))
275				}
276				#[cfg(not(feature = "kv-mem"))]
277                return Err(Error::Ds("Cannot connect to the `memory` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
278			}
279			// Parse and initiate a File datastore
280			s if s.starts_with("file:") => {
281				#[cfg(feature = "kv-rocksdb")]
282				{
283					// Create a new blocking threadpool
284					super::threadpool::initialise();
285					// Innitialise the storage engine
286					info!(target: TARGET, "Starting kvs store at {}", path);
287					warn!("file:// is deprecated, please use surrealkv:// or rocksdb://");
288					let s = s.trim_start_matches("file://");
289					let s = s.trim_start_matches("file:");
290					let v = super::rocksdb::Datastore::new(s).await.map(DatastoreFlavor::RocksDB);
291					let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
292					info!(target: TARGET, "Started kvs store at {}", path);
293					Ok((v, c))
294				}
295				#[cfg(not(feature = "kv-rocksdb"))]
296                return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
297			}
298			// Parse and initiate a RocksDB datastore
299			s if s.starts_with("rocksdb:") => {
300				#[cfg(feature = "kv-rocksdb")]
301				{
302					// Create a new blocking threadpool
303					super::threadpool::initialise();
304					// Innitialise the storage engine
305					info!(target: TARGET, "Starting kvs store at {}", path);
306					let s = s.trim_start_matches("rocksdb://");
307					let s = s.trim_start_matches("rocksdb:");
308					let v = super::rocksdb::Datastore::new(s).await.map(DatastoreFlavor::RocksDB);
309					let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
310					info!(target: TARGET, "Started kvs store at {}", path);
311					Ok((v, c))
312				}
313				#[cfg(not(feature = "kv-rocksdb"))]
314                return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
315			}
316			// Parse and initiate a SurrealKV datastore
317			s if s.starts_with("surrealkv") => {
318				#[cfg(feature = "kv-surrealkv")]
319				{
320					// Create a new blocking threadpool
321					super::threadpool::initialise();
322					// Innitialise the storage engine
323					info!(target: TARGET, "Starting kvs store at {}", s);
324					let (path, enable_versions) =
325						super::surrealkv::Datastore::parse_start_string(s)?;
326					let v = super::surrealkv::Datastore::new(path, enable_versions)
327						.await
328						.map(DatastoreFlavor::SurrealKV);
329					let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
330					info!(target: TARGET, "Started kvs store at {} with versions {}", path, if enable_versions { "enabled" } else { "disabled" });
331					Ok((v, c))
332				}
333				#[cfg(not(feature = "kv-surrealkv"))]
334                return Err(Error::Ds("Cannot connect to the `surrealkv` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
335			}
336			// Parse and initiate a SurrealCS datastore
337			s if s.starts_with("surrealcs:") => {
338				#[cfg(feature = "kv-surrealcs")]
339				{
340					info!(target: TARGET, "Starting kvs store at {}", path);
341					let s = s.trim_start_matches("surrealcs://");
342					let s = s.trim_start_matches("surrealcs:");
343					let v =
344						super::surrealcs::Datastore::new(s).await.map(DatastoreFlavor::SurrealCS);
345					let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
346					info!(target: TARGET, "Started kvs store at {}", path);
347					Ok((v, c))
348				}
349				#[cfg(not(feature = "kv-surrealcs"))]
350				return Err(Error::Ds("Cannot connect to the `surrealcs` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
351			}
352			// Parse and initiate an IndxDB database
353			s if s.starts_with("indxdb:") => {
354				#[cfg(feature = "kv-indxdb")]
355				{
356					info!(target: TARGET, "Starting kvs store at {}", path);
357					let s = s.trim_start_matches("indxdb://");
358					let s = s.trim_start_matches("indxdb:");
359					let v = super::indxdb::Datastore::new(s).await.map(DatastoreFlavor::IndxDB);
360					let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
361					info!(target: TARGET, "Started kvs store at {}", path);
362					Ok((v, c))
363				}
364				#[cfg(not(feature = "kv-indxdb"))]
365                return Err(Error::Ds("Cannot connect to the `indxdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
366			}
367			// Parse and initiate a TiKV datastore
368			s if s.starts_with("tikv:") => {
369				#[cfg(feature = "kv-tikv")]
370				{
371					info!(target: TARGET, "Connecting to kvs store at {}", path);
372					let s = s.trim_start_matches("tikv://");
373					let s = s.trim_start_matches("tikv:");
374					let v = super::tikv::Datastore::new(s).await.map(DatastoreFlavor::TiKV);
375					let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
376					info!(target: TARGET, "Connected to kvs store at {}", path);
377					Ok((v, c))
378				}
379				#[cfg(not(feature = "kv-tikv"))]
380                return Err(Error::Ds("Cannot connect to the `tikv` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
381			}
382			// Parse and initiate a FoundationDB datastore
383			s if s.starts_with("fdb:") => {
384				#[cfg(feature = "kv-fdb")]
385				{
386					info!(target: TARGET, "Connecting to kvs store at {}", path);
387					let s = s.trim_start_matches("fdb://");
388					let s = s.trim_start_matches("fdb:");
389					let v = super::fdb::Datastore::new(s).await.map(DatastoreFlavor::FoundationDB);
390					let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
391					info!(target: TARGET, "Connected to kvs store at {}", path);
392					Ok((v, c))
393				}
394				#[cfg(not(feature = "kv-fdb"))]
395                return Err(Error::Ds("Cannot connect to the `foundationdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
396			}
397			// The datastore path is not valid
398			_ => {
399				info!(target: TARGET, "Unable to load the specified datastore {}", path);
400				Err(Error::Ds("Unable to load the specified datastore".into()))
401			}
402		}?;
403		// Set the properties on the datastore
404		flavor.map(|flavor| {
405			let tf = TransactionFactory {
406				clock,
407				flavor: Arc::new(flavor),
408			};
409			Self {
410				id: Uuid::new_v4(),
411				transaction_factory: tf.clone(),
412				strict: false,
413				auth_enabled: false,
414				query_timeout: None,
415				transaction_timeout: None,
416				notification_channel: None,
417				capabilities: Arc::new(Capabilities::default()),
418				index_stores: IndexStores::default(),
419				#[cfg(not(target_family = "wasm"))]
420				index_builder: IndexBuilder::new(tf),
421				#[cfg(feature = "jwks")]
422				jwks_cache: Arc::new(RwLock::new(JwksCache::new())),
423				#[cfg(storage)]
424				temporary_directory: None,
425				cache: Arc::new(DatastoreCache::new()),
426			}
427		})
428	}
429
430	/// Create a new datastore with the same persistent data (inner), with flushed cache.
431	/// Simulating a server restart
432	#[allow(dead_code)]
433	pub fn restart(self) -> Self {
434		Self {
435			id: self.id,
436			strict: self.strict,
437			auth_enabled: self.auth_enabled,
438			query_timeout: self.query_timeout,
439			transaction_timeout: self.transaction_timeout,
440			capabilities: self.capabilities,
441			notification_channel: self.notification_channel,
442			index_stores: Default::default(),
443			#[cfg(not(target_family = "wasm"))]
444			index_builder: IndexBuilder::new(self.transaction_factory.clone()),
445			#[cfg(feature = "jwks")]
446			jwks_cache: Arc::new(Default::default()),
447			#[cfg(storage)]
448			temporary_directory: self.temporary_directory,
449			transaction_factory: self.transaction_factory,
450			cache: Arc::new(DatastoreCache::new()),
451		}
452	}
453
454	/// Specify whether this Datastore should run in strict mode
455	pub fn with_node_id(mut self, id: Uuid) -> Self {
456		self.id = id;
457		self
458	}
459
460	/// Specify whether this Datastore should run in strict mode
461	pub fn with_strict_mode(mut self, strict: bool) -> Self {
462		self.strict = strict;
463		self
464	}
465
466	/// Specify whether this datastore should enable live query notifications
467	pub fn with_notifications(mut self) -> Self {
468		self.notification_channel = Some(async_channel::bounded(LQ_CHANNEL_SIZE));
469		self
470	}
471
472	/// Set a global query timeout for this Datastore
473	pub fn with_query_timeout(mut self, duration: Option<Duration>) -> Self {
474		self.query_timeout = duration;
475		self
476	}
477
478	/// Set a global transaction timeout for this Datastore
479	pub fn with_transaction_timeout(mut self, duration: Option<Duration>) -> Self {
480		self.transaction_timeout = duration;
481		self
482	}
483
484	/// Set whether authentication is enabled for this Datastore
485	pub fn with_auth_enabled(mut self, enabled: bool) -> Self {
486		self.auth_enabled = enabled;
487		self
488	}
489
490	/// Set specific capabilities for this Datastore
491	pub fn with_capabilities(mut self, caps: Capabilities) -> Self {
492		self.capabilities = Arc::new(caps);
493		self
494	}
495
496	#[cfg(storage)]
497	/// Set a temporary directory for ordering of large result sets
498	pub fn with_temporary_directory(mut self, path: Option<PathBuf>) -> Self {
499		self.temporary_directory = path.map(Arc::new);
500		self
501	}
502
503	pub fn index_store(&self) -> &IndexStores {
504		&self.index_stores
505	}
506
507	/// Is authentication enabled for this Datastore?
508	pub fn is_auth_enabled(&self) -> bool {
509		self.auth_enabled
510	}
511
512	pub fn id(&self) -> Uuid {
513		self.id
514	}
515
516	/// Does the datastore allow excecuting an RPC method?
517	pub(crate) fn allows_rpc_method(&self, method_target: &MethodTarget) -> bool {
518		self.capabilities.allows_rpc_method(method_target)
519	}
520
521	/// Does the datastore allow requesting an HTTP route?
522	/// This function needs to be public to allow access from the CLI crate.
523	pub fn allows_http_route(&self, route_target: &RouteTarget) -> bool {
524		self.capabilities.allows_http_route(route_target)
525	}
526
527	/// Does the datastore allow requesting an HTTP route?
528	/// This function needs to be public to allow access from the CLI crate.
529	pub fn allows_query_by_subject(&self, subject: impl Into<ArbitraryQueryTarget>) -> bool {
530		self.capabilities.allows_query(&subject.into())
531	}
532
533	/// Does the datastore allow connections to a network target?
534	#[cfg(feature = "jwks")]
535	pub(crate) fn allows_network_target(&self, net_target: &NetTarget) -> bool {
536		self.capabilities.allows_network_target(net_target)
537	}
538
539	/// Set specific capabilities for this Datastore
540	pub fn get_capabilities(&self) -> &Capabilities {
541		&self.capabilities
542	}
543
544	#[cfg(feature = "jwks")]
545	pub(crate) fn jwks_cache(&self) -> &Arc<RwLock<JwksCache>> {
546		&self.jwks_cache
547	}
548
549	pub(super) async fn clock_now(&self) -> Timestamp {
550		self.transaction_factory.clock.now().await
551	}
552
553	// Used for testing live queries
554	#[allow(dead_code)]
555	pub fn get_cache(&self) -> Arc<DatastoreCache> {
556		self.cache.clone()
557	}
558
559	// Initialise the cluster and run bootstrap utilities
560	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
561	pub async fn check_version(&self) -> Result<Version, Error> {
562		let version = self.get_version().await?;
563		// Check we are running the latest version
564		if !version.is_latest() {
565			return Err(Error::OutdatedStorageVersion);
566		}
567		// Everything ok
568		Ok(version)
569	}
570
571	// Initialise the cluster and run bootstrap utilities
572	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
573	pub async fn get_version(&self) -> Result<Version, Error> {
574		// Start a new writeable transaction
575		let txn = self.transaction(Write, Pessimistic).await?.enclose();
576		// Create the key where the version is stored
577		let key = crate::key::version::new();
578		// Check if a version is already set in storage
579		let val = match catch!(txn, txn.get(key.clone(), None).await) {
580			// There is a version set in the storage
581			Some(v) => {
582				// Attempt to decode the current stored version
583				let val = TryInto::<Version>::try_into(v);
584				// Check for errors, and cancel the transaction
585				match val {
586					// There was en error getting the version
587					Err(err) => {
588						// We didn't write anything, so just rollback
589						catch!(txn, txn.cancel().await);
590						// Return the error
591						return Err(err);
592					}
593					// We could decode the version correctly
594					Ok(val) => {
595						// We didn't write anything, so just rollback
596						catch!(txn, txn.cancel().await);
597						// Return the current version
598						val
599					}
600				}
601			}
602			// There is no version set in the storage
603			None => {
604				// Fetch any keys immediately following the version key
605				let rng = crate::key::version::proceeding();
606				let keys = catch!(txn, txn.keys(rng, 1, None).await);
607				// Check the storage if there are any other keys set
608				let val = if keys.is_empty() {
609					// There are no keys set in storage, so this is a new database
610					Version::latest()
611				} else {
612					// There were keys in storage, so this is an upgrade
613					Version::v1()
614				};
615				// Convert the version to binary
616				let bytes: Vec<u8> = val.into();
617				// Attempt to set the current version in storage
618				catch!(txn, txn.replace(key, bytes).await);
619				// We set the version, so commit the transaction
620				catch!(txn, txn.commit().await);
621				// Return the current version
622				val
623			}
624		};
625		// Everything ok
626		Ok(val)
627	}
628
629	/// Setup the initial cluster access credentials
630	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
631	pub async fn initialise_credentials(&self, user: &str, pass: &str) -> Result<(), Error> {
632		// Start a new writeable transaction
633		let txn = self.transaction(Write, Optimistic).await?.enclose();
634		// Fetch the root users from the storage
635		let users = catch!(txn, txn.all_root_users().await);
636		// Process credentials, depending on existing users
637		if users.is_empty() {
638			// Display information in the logs
639			info!(target: TARGET, "Credentials were provided, and no root users were found. The root user '{user}' will be created");
640			// Create and new root user definition
641			let stm = DefineUserStatement::from((Base::Root, user, pass, INITIAL_USER_ROLE));
642			let opt = Options::new().with_auth(Arc::new(Auth::for_root(Role::Owner)));
643			let mut ctx = MutableContext::default();
644			ctx.set_transaction(txn.clone());
645			let ctx = ctx.freeze();
646			catch!(txn, stm.compute(&ctx, &opt, None).await);
647			// We added a user, so commit the transaction
648			txn.commit().await
649		} else {
650			// Display information in the logs
651			warn!(target: TARGET, "Credentials were provided, but existing root users were found. The root user '{user}' will not be created");
652			warn!(target: TARGET, "Consider removing the --user and --pass arguments from the server start command");
653			// We didn't write anything, so just rollback
654			txn.cancel().await
655		}
656	}
657
658	/// Initialise the cluster and run bootstrap utilities
659	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
660	pub async fn bootstrap(&self) -> Result<(), Error> {
661		// Insert this node in the cluster
662		self.insert_node(self.id).await?;
663		// Mark inactive nodes as archived
664		self.expire_nodes().await?;
665		// Remove archived nodes
666		self.remove_nodes().await?;
667		// Everything ok
668		Ok(())
669	}
670
671	/// Run the background task to update node registration information
672	#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
673	pub async fn node_membership_update(&self) -> Result<(), Error> {
674		// Output function invocation details to logs
675		trace!(target: TARGET, "Updating node registration information");
676		// Update this node in the cluster
677		self.update_node(self.id).await?;
678		// Everything ok
679		Ok(())
680	}
681
682	/// Run the background task to process and archive inactive nodes
683	#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
684	pub async fn node_membership_expire(&self) -> Result<(), Error> {
685		// Output function invocation details to logs
686		trace!(target: TARGET, "Processing and archiving inactive nodes");
687		// Mark expired nodes as archived
688		self.expire_nodes().await?;
689		// Everything ok
690		Ok(())
691	}
692
693	/// Run the background task to process and cleanup archived nodes
694	#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
695	pub async fn node_membership_remove(&self) -> Result<(), Error> {
696		// Output function invocation details to logs
697		trace!(target: TARGET, "Processing and cleaning archived nodes");
698		// Cleanup expired nodes data
699		self.remove_nodes().await?;
700		// Everything ok
701		Ok(())
702	}
703
704	/// Run the background task to perform changefeed garbage collection
705	#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
706	pub async fn changefeed_process(&self) -> Result<(), Error> {
707		// Output function invocation details to logs
708		trace!(target: TARGET, "Running changefeed garbage collection");
709		// Calculate the current system time
710		let ts = SystemTime::now()
711			.duration_since(UNIX_EPOCH)
712			.map_err(|e| {
713				Error::Internal(format!("Clock may have gone backwards: {:?}", e.duration()))
714			})?
715			.as_secs();
716		// Save timestamps for current versionstamps
717		self.changefeed_versionstamp(ts).await?;
718		// Garbage old changefeed data from all databases
719		self.changefeed_cleanup(ts).await?;
720		// Everything ok
721		Ok(())
722	}
723
724	/// Run the background task to perform changefeed garbage collection
725	#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
726	pub async fn changefeed_process_at(&self, ts: u64) -> Result<(), Error> {
727		// Output function invocation details to logs
728		trace!(target: TARGET, "Running changefeed garbage collection");
729		// Save timestamps for current versionstamps
730		self.changefeed_versionstamp(ts).await?;
731		// Garbage old changefeed data from all databases
732		self.changefeed_cleanup(ts).await?;
733		// Everything ok
734		Ok(())
735	}
736
737	/// Run the datastore shutdown tasks, perfoming any necessary cleanup
738	#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
739	pub async fn shutdown(&self) -> Result<(), Error> {
740		// Output function invocation details to logs
741		trace!(target: TARGET, "Running datastore shutdown operations");
742		// Delete this datastore from the cluster
743		self.delete_node(self.id).await?;
744		// Run any storag engine shutdown tasks
745		match self.transaction_factory.flavor.as_ref() {
746			#[cfg(feature = "kv-mem")]
747			DatastoreFlavor::Mem(v) => v.shutdown().await,
748			#[cfg(feature = "kv-rocksdb")]
749			DatastoreFlavor::RocksDB(v) => v.shutdown().await,
750			#[cfg(feature = "kv-indxdb")]
751			DatastoreFlavor::IndxDB(v) => v.shutdown().await,
752			#[cfg(feature = "kv-tikv")]
753			DatastoreFlavor::TiKV(v) => v.shutdown().await,
754			#[cfg(feature = "kv-fdb")]
755			DatastoreFlavor::FoundationDB(v) => v.shutdown().await,
756			#[cfg(feature = "kv-surrealkv")]
757			DatastoreFlavor::SurrealKV(v) => v.shutdown().await,
758			#[cfg(feature = "kv-surrealcs")]
759			DatastoreFlavor::SurrealCS(v) => v.shutdown().await,
760			#[allow(unreachable_patterns)]
761			_ => unreachable!(),
762		}
763	}
764
765	/// Create a new transaction on this datastore
766	///
767	/// ```rust,no_run
768	/// use surrealdb_core::kvs::{Datastore, TransactionType::*, LockType::*};
769	/// use surrealdb_core::err::Error;
770	///
771	/// #[tokio::main]
772	/// async fn main() -> Result<(), Error> {
773	///     let ds = Datastore::new("file://database.db").await?;
774	///     let mut tx = ds.transaction(Write, Optimistic).await?;
775	///     tx.cancel().await?;
776	///     Ok(())
777	/// }
778	/// ```
779	#[allow(unreachable_code)]
780	pub async fn transaction(
781		&self,
782		write: TransactionType,
783		lock: LockType,
784	) -> Result<Transaction, Error> {
785		self.transaction_factory.transaction(write, lock).await
786	}
787
788	/// Parse and execute an SQL query
789	///
790	/// ```rust,no_run
791	/// use surrealdb_core::kvs::Datastore;
792	/// use surrealdb_core::err::Error;
793	/// use surrealdb_core::dbs::Session;
794	///
795	/// #[tokio::main]
796	/// async fn main() -> Result<(), Error> {
797	///     let ds = Datastore::new("memory").await?;
798	///     let ses = Session::owner();
799	///     let ast = "USE NS test DB test; SELECT * FROM person;";
800	///     let res = ds.execute(ast, &ses, None).await?;
801	///     Ok(())
802	/// }
803	/// ```
804	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
805	pub async fn execute(
806		&self,
807		txt: &str,
808		sess: &Session,
809		vars: Variables,
810	) -> Result<Vec<Response>, Error> {
811		// Parse the SQL query text
812		let ast = syn::parse_with_capabilities(txt, &self.capabilities)?;
813		// Process the AST
814		self.process(ast, sess, vars).await
815	}
816
817	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
818	pub async fn execute_import<S>(
819		&self,
820		sess: &Session,
821		vars: Variables,
822		query: S,
823	) -> Result<Vec<Response>, Error>
824	where
825		S: Stream<Item = Result<Bytes, Error>>,
826	{
827		// Check if the session has expired
828		if sess.expired() {
829			return Err(Error::ExpiredSession);
830		}
831
832		// Check if anonymous actors can execute queries when auth is enabled
833		// TODO(sgirones): Check this as part of the authorisation layer
834		self.check_anon(sess).map_err(|_| IamError::NotAllowed {
835			actor: "anonymous".to_string(),
836			action: "process".to_string(),
837			resource: "query".to_string(),
838		})?;
839
840		// Create a new query options
841		let opt = self.setup_options(sess);
842
843		// Create a default context
844		let mut ctx = self.setup_ctx()?;
845		// Start an execution context
846		sess.context(&mut ctx);
847		// Store the query variables
848		vars.attach(&mut ctx)?;
849		// Process all statements
850
851		let parser_settings = ParserSettings {
852			references_enabled: ctx
853				.get_capabilities()
854				.allows_experimental(&ExperimentalTarget::RecordReferences),
855			bearer_access_enabled: ctx
856				.get_capabilities()
857				.allows_experimental(&ExperimentalTarget::BearerAccess),
858			define_api_enabled: ctx
859				.get_capabilities()
860				.allows_experimental(&ExperimentalTarget::DefineApi),
861			..Default::default()
862		};
863		let mut statements_stream = StatementStream::new_with_settings(parser_settings);
864		let mut buffer = BytesMut::new();
865		let mut parse_size = 4096;
866		let mut bytes_stream = pin!(query);
867		let mut complete = false;
868		let mut filling = true;
869
870		let stream = futures::stream::poll_fn(move |cx| loop {
871			// fill the buffer to at least parse_size when filling is required.
872			while filling {
873				let bytes = ready!(bytes_stream.as_mut().poll_next(cx));
874				let bytes = match bytes {
875					Some(Err(e)) => return Poll::Ready(Some(Err(e))),
876					Some(Ok(x)) => x,
877					None => {
878						complete = true;
879						filling = false;
880						break;
881					}
882				};
883
884				buffer.extend_from_slice(&bytes);
885				filling = buffer.len() < parse_size
886			}
887
888			// if we finished streaming we can parse with complete so that the parser can be sure
889			// of it's results.
890			if complete {
891				return match statements_stream.parse_complete(&mut buffer) {
892					Err(e) => Poll::Ready(Some(Err(Error::InvalidQuery(e)))),
893					Ok(None) => Poll::Ready(None),
894					Ok(Some(x)) => Poll::Ready(Some(Ok(x))),
895				};
896			}
897
898			// otherwise try to parse a single statement.
899			match statements_stream.parse_partial(&mut buffer) {
900				Err(e) => return Poll::Ready(Some(Err(Error::InvalidQuery(e)))),
901				Ok(Some(x)) => return Poll::Ready(Some(Ok(x))),
902				Ok(None) => {
903					// Couldn't parse a statement for sure.
904					if buffer.len() >= parse_size && parse_size < u32::MAX as usize {
905						// the buffer already contained more or equal to parse_size bytes
906						// this means we are trying to parse a statement of more then buffer size.
907						// so we need to increase the buffer size.
908						parse_size = (parse_size + 1).next_power_of_two();
909					}
910					// start filling the buffer again.
911					filling = true;
912				}
913			}
914		});
915
916		Executor::execute_stream(self, Arc::new(ctx), opt, stream).await
917	}
918
919	/// Execute a pre-parsed SQL query
920	///
921	/// ```rust,no_run
922	/// use surrealdb_core::kvs::Datastore;
923	/// use surrealdb_core::err::Error;
924	/// use surrealdb_core::dbs::Session;
925	/// use surrealdb_core::sql::parse;
926	///
927	/// #[tokio::main]
928	/// async fn main() -> Result<(), Error> {
929	///     let ds = Datastore::new("memory").await?;
930	///     let ses = Session::owner();
931	///     let ast = parse("USE NS test DB test; SELECT * FROM person;")?;
932	///     let res = ds.process(ast, &ses, None).await?;
933	///     Ok(())
934	/// }
935	/// ```
936	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
937	pub async fn process(
938		&self,
939		ast: Query,
940		sess: &Session,
941		vars: Variables,
942	) -> Result<Vec<Response>, Error> {
943		// Check if the session has expired
944		if sess.expired() {
945			return Err(Error::ExpiredSession);
946		}
947		// Check if anonymous actors can execute queries when auth is enabled
948		// TODO(sgirones): Check this as part of the authorisation layer
949		self.check_anon(sess).map_err(|_| IamError::NotAllowed {
950			actor: "anonymous".to_string(),
951			action: "process".to_string(),
952			resource: "query".to_string(),
953		})?;
954
955		// Create a new query options
956		let opt = self.setup_options(sess);
957
958		// Create a default context
959		let mut ctx = self.setup_ctx()?;
960		// Start an execution context
961		sess.context(&mut ctx);
962		// Store the query variables
963		vars.attach(&mut ctx)?;
964		// Process all statements
965		Executor::execute(self, ctx.freeze(), opt, ast).await
966	}
967
968	/// Ensure a SQL [`Value`] is fully computed
969	///
970	/// ```rust,no_run
971	/// use surrealdb_core::kvs::Datastore;
972	/// use surrealdb_core::err::Error;
973	/// use surrealdb_core::dbs::Session;
974	/// use surrealdb_core::sql::Future;
975	/// use surrealdb_core::sql::Value;
976	///
977	/// #[tokio::main]
978	/// async fn main() -> Result<(), Error> {
979	///     let ds = Datastore::new("memory").await?;
980	///     let ses = Session::owner();
981	///     let val = Value::Future(Box::new(Future::from(Value::Bool(true))));
982	///     let res = ds.compute(val, &ses, None).await?;
983	///     Ok(())
984	/// }
985	/// ```
986	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
987	pub async fn compute(
988		&self,
989		val: Value,
990		sess: &Session,
991		vars: Variables,
992	) -> Result<Value, Error> {
993		// Check if the session has expired
994		if sess.expired() {
995			return Err(Error::ExpiredSession);
996		}
997		// Check if anonymous actors can compute values when auth is enabled
998		// TODO(sgirones): Check this as part of the authorisation layer
999		self.check_anon(sess).map_err(|_| IamError::NotAllowed {
1000			actor: "anonymous".to_string(),
1001			action: "compute".to_string(),
1002			resource: "value".to_string(),
1003		})?;
1004
1005		// Create a new memory stack
1006		let mut stack = TreeStack::new();
1007		// Create a new query options
1008		let opt = self.setup_options(sess);
1009		// Create a default context
1010		let mut ctx = MutableContext::default();
1011		// Set context capabilities
1012		ctx.add_capabilities(self.capabilities.clone());
1013		// Set the global query timeout
1014		if let Some(timeout) = self.query_timeout {
1015			ctx.add_timeout(timeout)?;
1016		}
1017		// Setup the notification channel
1018		if let Some(channel) = &self.notification_channel {
1019			ctx.add_notifications(Some(&channel.0));
1020		}
1021		// Start an execution context
1022		sess.context(&mut ctx);
1023		// Store the query variables
1024		vars.attach(&mut ctx)?;
1025		// Start a new transaction
1026		let txn = self.transaction(val.writeable().into(), Optimistic).await?.enclose();
1027		// Store the transaction
1028		ctx.set_transaction(txn.clone());
1029		// Freeze the context
1030		let ctx = ctx.freeze();
1031		// Compute the value
1032		let res = stack.enter(|stk| val.compute(stk, &ctx, &opt, None)).finish().await;
1033		// Store any data
1034		match (res.is_ok(), val.writeable()) {
1035			// If the compute was successful, then commit if writeable
1036			(true, true) => txn.commit().await?,
1037			// Cancel if the compute was an error, or if readonly
1038			(_, _) => txn.cancel().await?,
1039		};
1040		// Return result
1041		res
1042	}
1043
1044	/// Evaluates a SQL [`Value`] without checking authenticating config
1045	/// This is used in very specific cases, where we do not need to check
1046	/// whether authentication is enabled, or guest access is disabled.
1047	/// For example, this is used when processing a record access SIGNUP or
1048	/// SIGNIN clause, which still needs to work without guest access.
1049	///
1050	/// ```rust,no_run
1051	/// use surrealdb_core::kvs::Datastore;
1052	/// use surrealdb_core::err::Error;
1053	/// use surrealdb_core::dbs::Session;
1054	/// use surrealdb_core::sql::Future;
1055	/// use surrealdb_core::sql::Value;
1056	///
1057	/// #[tokio::main]
1058	/// async fn main() -> Result<(), Error> {
1059	///     let ds = Datastore::new("memory").await?;
1060	///     let ses = Session::owner();
1061	///     let val = Value::Future(Box::new(Future::from(Value::Bool(true))));
1062	///     let res = ds.evaluate(&val, &ses, None).await?;
1063	///     Ok(())
1064	/// }
1065	/// ```
1066	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1067	pub async fn evaluate(
1068		&self,
1069		val: &Value,
1070		sess: &Session,
1071		vars: Variables,
1072	) -> Result<Value, Error> {
1073		// Check if the session has expired
1074		if sess.expired() {
1075			return Err(Error::ExpiredSession);
1076		}
1077		// Create a new memory stack
1078		let mut stack = TreeStack::new();
1079		// Create a new query options
1080		let opt = self.setup_options(sess);
1081		// Create a default context
1082		let mut ctx = MutableContext::default();
1083		// Set context capabilities
1084		ctx.add_capabilities(self.capabilities.clone());
1085		// Set the global query timeout
1086		if let Some(timeout) = self.query_timeout {
1087			ctx.add_timeout(timeout)?;
1088		}
1089		// Setup the notification channel
1090		if let Some(channel) = &self.notification_channel {
1091			ctx.add_notifications(Some(&channel.0));
1092		}
1093		// Start an execution context
1094		sess.context(&mut ctx);
1095		// Store the query variables
1096		vars.attach(&mut ctx)?;
1097		// Start a new transaction
1098		let txn = self.transaction(val.writeable().into(), Optimistic).await?.enclose();
1099		// Store the transaction
1100		ctx.set_transaction(txn.clone());
1101		// Freeze the context
1102		let ctx = ctx.freeze();
1103		// Compute the value
1104		let res = stack.enter(|stk| val.compute(stk, &ctx, &opt, None)).finish().await;
1105		// Store any data
1106		match (res.is_ok(), val.writeable()) {
1107			// If the compute was successful, then commit if writeable
1108			(true, true) => txn.commit().await?,
1109			// Cancel if the compute was an error, or if readonly
1110			(_, _) => txn.cancel().await?,
1111		};
1112		// Return result
1113		res
1114	}
1115
1116	/// Subscribe to live notifications
1117	///
1118	/// ```rust,no_run
1119	/// use surrealdb_core::kvs::Datastore;
1120	/// use surrealdb_core::err::Error;
1121	/// use surrealdb_core::dbs::Session;
1122	///
1123	/// #[tokio::main]
1124	/// async fn main() -> Result<(), Error> {
1125	///     let ds = Datastore::new("memory").await?.with_notifications();
1126	///     let ses = Session::owner();
1127	/// 	if let Some(channel) = ds.notifications() {
1128	///     	while let Ok(v) = channel.recv().await {
1129	///     	    println!("Received notification: {v}");
1130	///     	}
1131	/// 	}
1132	///     Ok(())
1133	/// }
1134	/// ```
1135	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1136	pub fn notifications(&self) -> Option<Receiver<Notification>> {
1137		self.notification_channel.as_ref().map(|v| v.1.clone())
1138	}
1139
1140	/// Performs a database import from SQL
1141	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1142	pub async fn import(&self, sql: &str, sess: &Session) -> Result<Vec<Response>, Error> {
1143		// Check if the session has expired
1144		if sess.expired() {
1145			return Err(Error::ExpiredSession);
1146		}
1147		// Execute the SQL import
1148		self.execute(sql, sess, None).await
1149	}
1150
1151	/// Performs a database import from SQL
1152	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1153	pub async fn import_stream<S>(&self, sess: &Session, stream: S) -> Result<Vec<Response>, Error>
1154	where
1155		S: Stream<Item = Result<Bytes, Error>>,
1156	{
1157		// Check if the session has expired
1158		if sess.expired() {
1159			return Err(Error::ExpiredSession);
1160		}
1161		// Execute the SQL import
1162		self.execute_import(sess, None, stream).await
1163	}
1164
1165	/// Performs a full database export as SQL
1166	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1167	pub async fn export(
1168		&self,
1169		sess: &Session,
1170		chn: Sender<Vec<u8>>,
1171	) -> Result<impl Future<Output = Result<(), Error>>, Error> {
1172		// Create a default export config
1173		let cfg = super::export::Config::default();
1174		self.export_with_config(sess, chn, cfg).await
1175	}
1176
1177	/// Performs a full database export as SQL
1178	#[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1179	pub async fn export_with_config(
1180		&self,
1181		sess: &Session,
1182		chn: Sender<Vec<u8>>,
1183		cfg: export::Config,
1184	) -> Result<impl Future<Output = Result<(), Error>>, Error> {
1185		// Check if the session has expired
1186		if sess.expired() {
1187			return Err(Error::ExpiredSession);
1188		}
1189		// Retrieve the provided NS and DB
1190		let (ns, db) = crate::iam::check::check_ns_db(sess)?;
1191		// Create a new readonly transaction
1192		let txn = self.transaction(Read, Optimistic).await?;
1193		// Return an async export job
1194		Ok(async move {
1195			// Process the export
1196			txn.export(&ns, &db, cfg, chn).await?;
1197			// Everything ok
1198			Ok(())
1199		})
1200	}
1201
1202	/// Checks the required permissions level for this session
1203	#[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self, sess))]
1204	pub fn check(&self, sess: &Session, action: Action, resource: Resource) -> Result<(), Error> {
1205		// Check if the session has expired
1206		if sess.expired() {
1207			return Err(Error::ExpiredSession);
1208		}
1209		// Skip auth for Anonymous users if auth is disabled
1210		let skip_auth = !self.is_auth_enabled() && sess.au.is_anon();
1211		if !skip_auth {
1212			sess.au.is_allowed(action, &resource)?;
1213		}
1214		// All ok
1215		Ok(())
1216	}
1217
1218	pub fn setup_options(&self, sess: &Session) -> Options {
1219		Options::default()
1220			.with_id(self.id)
1221			.with_ns(sess.ns())
1222			.with_db(sess.db())
1223			.with_live(sess.live())
1224			.with_auth(sess.au.clone())
1225			.with_strict(self.strict)
1226			.with_auth_enabled(self.auth_enabled)
1227	}
1228	pub fn setup_ctx(&self) -> Result<MutableContext, Error> {
1229		let mut ctx = MutableContext::from_ds(
1230			self.query_timeout,
1231			self.capabilities.clone(),
1232			self.index_stores.clone(),
1233			self.cache.clone(),
1234			#[cfg(not(target_family = "wasm"))]
1235			self.index_builder.clone(),
1236			#[cfg(storage)]
1237			self.temporary_directory.clone(),
1238		)?;
1239		// Setup the notification channel
1240		if let Some(channel) = &self.notification_channel {
1241			ctx.add_notifications(Some(&channel.0));
1242		}
1243		Ok(ctx)
1244	}
1245
1246	/// check for disallowed anonymous users
1247	pub fn check_anon(&self, sess: &Session) -> Result<(), IamError> {
1248		if self.auth_enabled && sess.au.is_anon() && !self.capabilities.allows_guest_access() {
1249			Err(IamError::NotAllowed {
1250				actor: "anonymous".to_string(),
1251				action: String::new(),
1252				resource: String::new(),
1253			})
1254		} else {
1255			Ok(())
1256		}
1257	}
1258}
1259
1260#[cfg(test)]
1261mod test {
1262	use super::*;
1263
1264	#[tokio::test]
1265	pub async fn very_deep_query() -> Result<(), Error> {
1266		use crate::kvs::Datastore;
1267		use crate::sql::{Expression, Future, Number, Operator, Value};
1268		use reblessive::{Stack, Stk};
1269
1270		// build query manually to bypass query limits.
1271		let mut stack = Stack::new();
1272		async fn build_query(stk: &mut Stk, depth: usize) -> Value {
1273			if depth == 0 {
1274				Value::Expression(Box::new(Expression::Binary {
1275					l: Value::Number(Number::Int(1)),
1276					o: Operator::Add,
1277					r: Value::Number(Number::Int(1)),
1278				}))
1279			} else {
1280				let q = stk.run(|stk| build_query(stk, depth - 1)).await;
1281				Value::Future(Box::new(Future::from(q)))
1282			}
1283		}
1284		let val = stack.enter(|stk| build_query(stk, 1000)).finish();
1285
1286		let dbs = Datastore::new("memory").await.unwrap().with_capabilities(Capabilities::all());
1287
1288		let opt = Options::default()
1289			.with_id(dbs.id)
1290			.with_ns(Some("test".into()))
1291			.with_db(Some("test".into()))
1292			.with_live(false)
1293			.with_strict(false)
1294			.with_auth_enabled(false)
1295			.with_max_computation_depth(u32::MAX)
1296			.with_futures(true);
1297
1298		// Create a default context
1299		let mut ctx = MutableContext::default();
1300		// Set context capabilities
1301		ctx.add_capabilities(dbs.capabilities.clone());
1302		// Start a new transaction
1303		let txn = dbs.transaction(val.writeable().into(), Optimistic).await?;
1304		// Store the transaction
1305		ctx.set_transaction(txn.enclose());
1306		// Freeze the context
1307		let ctx = ctx.freeze();
1308		// Compute the value
1309		let mut stack = reblessive::tree::TreeStack::new();
1310		let res = stack.enter(|stk| val.compute(stk, &ctx, &opt, None)).finish().await.unwrap();
1311		assert_eq!(res, Value::Number(Number::Int(2)));
1312		Ok(())
1313	}
1314}