1use super::export;
2use super::tr::Transactor;
3use super::tx::Transaction;
4use super::version::Version;
5use crate::cf;
6use crate::ctx::MutableContext;
7#[cfg(feature = "jwks")]
8use crate::dbs::capabilities::NetTarget;
9use crate::dbs::capabilities::{
10 ArbitraryQueryTarget, ExperimentalTarget, MethodTarget, RouteTarget,
11};
12use crate::dbs::node::Timestamp;
13use crate::dbs::{
14 Attach, Capabilities, Executor, Notification, Options, Response, Session, Variables,
15};
16use crate::err::Error;
17#[cfg(feature = "jwks")]
18use crate::iam::jwks::JwksCache;
19use crate::iam::{Action, Auth, Error as IamError, Resource, Role};
20use crate::idx::trees::store::IndexStores;
21use crate::kvs::cache::ds::DatastoreCache;
22use crate::kvs::clock::SizedClock;
23#[allow(unused_imports)]
24use crate::kvs::clock::SystemClock;
25#[cfg(not(target_family = "wasm"))]
26use crate::kvs::index::IndexBuilder;
27use crate::kvs::{LockType, LockType::*, TransactionType, TransactionType::*};
28use crate::sql::{statements::DefineUserStatement, Base, Query, Value};
29use crate::syn;
30use crate::syn::parser::{ParserSettings, StatementStream};
31use async_channel::{Receiver, Sender};
32use bytes::{Bytes, BytesMut};
33use futures::{Future, Stream};
34use reblessive::TreeStack;
35use std::fmt;
36#[cfg(storage)]
37use std::path::PathBuf;
38use std::pin::pin;
39use std::sync::Arc;
40use std::task::{ready, Poll};
41use std::time::Duration;
42#[cfg(not(target_family = "wasm"))]
43use std::time::{SystemTime, UNIX_EPOCH};
44#[cfg(feature = "jwks")]
45use tokio::sync::RwLock;
46use tracing::instrument;
47use tracing::trace;
48use uuid::Uuid;
49#[cfg(target_family = "wasm")]
50use wasmtimer::std::{SystemTime, UNIX_EPOCH};
51
52const TARGET: &str = "surrealdb::core::kvs::ds";
53
54const LQ_CHANNEL_SIZE: usize = 15_000;
56
57const INITIAL_USER_ROLE: &str = "owner";
59
60#[allow(dead_code)]
62#[non_exhaustive]
63pub struct Datastore {
64 transaction_factory: TransactionFactory,
65 id: Uuid,
67 strict: bool,
69 auth_enabled: bool,
71 query_timeout: Option<Duration>,
73 transaction_timeout: Option<Duration>,
75 capabilities: Arc<Capabilities>,
77 notification_channel: Option<(Sender<Notification>, Receiver<Notification>)>,
79 index_stores: IndexStores,
81 cache: Arc<DatastoreCache>,
83 #[cfg(not(target_family = "wasm"))]
85 index_builder: IndexBuilder,
86 #[cfg(feature = "jwks")]
87 jwks_cache: Arc<RwLock<JwksCache>>,
89 #[cfg(storage)]
90 temporary_directory: Option<Arc<PathBuf>>,
92}
93
94#[derive(Clone)]
95pub(super) struct TransactionFactory {
96 clock: Arc<SizedClock>,
98 flavor: Arc<DatastoreFlavor>,
100}
101
102impl TransactionFactory {
103 #[allow(unreachable_code)]
104 pub async fn transaction(
105 &self,
106 write: TransactionType,
107 lock: LockType,
108 ) -> Result<Transaction, Error> {
109 #[allow(unused_variables)]
111 let write = match write {
112 Read => false,
113 Write => true,
114 };
115 #[allow(unused_variables)]
117 let lock = match lock {
118 Pessimistic => true,
119 Optimistic => false,
120 };
121 #[allow(unused_variables)]
123 let (inner, local) = match self.flavor.as_ref() {
124 #[cfg(feature = "kv-mem")]
125 DatastoreFlavor::Mem(v) => {
126 let tx = v.transaction(write, lock).await?;
127 (super::tr::Inner::Mem(tx), true)
128 }
129 #[cfg(feature = "kv-rocksdb")]
130 DatastoreFlavor::RocksDB(v) => {
131 let tx = v.transaction(write, lock).await?;
132 (super::tr::Inner::RocksDB(tx), true)
133 }
134 #[cfg(feature = "kv-indxdb")]
135 DatastoreFlavor::IndxDB(v) => {
136 let tx = v.transaction(write, lock).await?;
137 (super::tr::Inner::IndxDB(tx), true)
138 }
139 #[cfg(feature = "kv-tikv")]
140 DatastoreFlavor::TiKV(v) => {
141 let tx = v.transaction(write, lock).await?;
142 (super::tr::Inner::TiKV(tx), false)
143 }
144 #[cfg(feature = "kv-fdb")]
145 DatastoreFlavor::FoundationDB(v) => {
146 let tx = v.transaction(write, lock).await?;
147 (super::tr::Inner::FoundationDB(tx), false)
148 }
149 #[cfg(feature = "kv-surrealkv")]
150 DatastoreFlavor::SurrealKV(v) => {
151 let tx = v.transaction(write, lock).await?;
152 (super::tr::Inner::SurrealKV(tx), true)
153 }
154 #[cfg(feature = "kv-surrealcs")]
155 DatastoreFlavor::SurrealCS(v) => {
156 let tx = v.transaction(write, lock).await?;
157 (super::tr::Inner::SurrealCS(tx), false)
158 }
159 #[allow(unreachable_patterns)]
160 _ => unreachable!(),
161 };
162 Ok(Transaction::new(
163 local,
164 Transactor {
165 inner,
166 stash: super::stash::Stash::default(),
167 cf: cf::Writer::new(),
168 clock: self.clock.clone(),
169 },
170 ))
171 }
172}
173
174#[allow(clippy::large_enum_variant)]
175pub(super) enum DatastoreFlavor {
176 #[cfg(feature = "kv-mem")]
177 Mem(super::mem::Datastore),
178 #[cfg(feature = "kv-rocksdb")]
179 RocksDB(super::rocksdb::Datastore),
180 #[cfg(feature = "kv-indxdb")]
181 IndxDB(super::indxdb::Datastore),
182 #[cfg(feature = "kv-tikv")]
183 TiKV(super::tikv::Datastore),
184 #[cfg(feature = "kv-fdb")]
185 FoundationDB(super::fdb::Datastore),
186 #[cfg(feature = "kv-surrealkv")]
187 SurrealKV(super::surrealkv::Datastore),
188 #[cfg(feature = "kv-surrealcs")]
189 SurrealCS(super::surrealcs::Datastore),
190}
191
192impl fmt::Display for Datastore {
193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194 #![allow(unused_variables)]
195 match self.transaction_factory.flavor.as_ref() {
196 #[cfg(feature = "kv-mem")]
197 DatastoreFlavor::Mem(_) => write!(f, "memory"),
198 #[cfg(feature = "kv-rocksdb")]
199 DatastoreFlavor::RocksDB(_) => write!(f, "rocksdb"),
200 #[cfg(feature = "kv-indxdb")]
201 DatastoreFlavor::IndxDB(_) => write!(f, "indxdb"),
202 #[cfg(feature = "kv-tikv")]
203 DatastoreFlavor::TiKV(_) => write!(f, "tikv"),
204 #[cfg(feature = "kv-fdb")]
205 DatastoreFlavor::FoundationDB(_) => write!(f, "fdb"),
206 #[cfg(feature = "kv-surrealkv")]
207 DatastoreFlavor::SurrealKV(_) => write!(f, "surrealkv"),
208 #[cfg(feature = "kv-surrealcs")]
209 DatastoreFlavor::SurrealCS(_) => write!(f, "surrealcs"),
210 #[allow(unreachable_patterns)]
211 _ => unreachable!(),
212 }
213 }
214}
215
216impl Datastore {
217 pub async fn new(path: &str) -> Result<Self, Error> {
255 Self::new_with_clock(path, None).await
256 }
257
258 #[allow(unused_variables)]
259 pub async fn new_with_clock(
260 path: &str,
261 clock: Option<Arc<SizedClock>>,
262 ) -> Result<Datastore, Error> {
263 let (flavor, clock): (Result<DatastoreFlavor, Error>, Arc<SizedClock>) = match path {
265 "memory" => {
267 #[cfg(feature = "kv-mem")]
268 {
269 info!(target: TARGET, "Starting kvs store in {}", path);
271 let v = super::mem::Datastore::new().await.map(DatastoreFlavor::Mem);
272 let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
273 info!(target: TARGET, "Started kvs store in {}", path);
274 Ok((v, c))
275 }
276 #[cfg(not(feature = "kv-mem"))]
277 return Err(Error::Ds("Cannot connect to the `memory` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
278 }
279 s if s.starts_with("file:") => {
281 #[cfg(feature = "kv-rocksdb")]
282 {
283 super::threadpool::initialise();
285 info!(target: TARGET, "Starting kvs store at {}", path);
287 warn!("file:// is deprecated, please use surrealkv:// or rocksdb://");
288 let s = s.trim_start_matches("file://");
289 let s = s.trim_start_matches("file:");
290 let v = super::rocksdb::Datastore::new(s).await.map(DatastoreFlavor::RocksDB);
291 let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
292 info!(target: TARGET, "Started kvs store at {}", path);
293 Ok((v, c))
294 }
295 #[cfg(not(feature = "kv-rocksdb"))]
296 return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
297 }
298 s if s.starts_with("rocksdb:") => {
300 #[cfg(feature = "kv-rocksdb")]
301 {
302 super::threadpool::initialise();
304 info!(target: TARGET, "Starting kvs store at {}", path);
306 let s = s.trim_start_matches("rocksdb://");
307 let s = s.trim_start_matches("rocksdb:");
308 let v = super::rocksdb::Datastore::new(s).await.map(DatastoreFlavor::RocksDB);
309 let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
310 info!(target: TARGET, "Started kvs store at {}", path);
311 Ok((v, c))
312 }
313 #[cfg(not(feature = "kv-rocksdb"))]
314 return Err(Error::Ds("Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
315 }
316 s if s.starts_with("surrealkv") => {
318 #[cfg(feature = "kv-surrealkv")]
319 {
320 super::threadpool::initialise();
322 info!(target: TARGET, "Starting kvs store at {}", s);
324 let (path, enable_versions) =
325 super::surrealkv::Datastore::parse_start_string(s)?;
326 let v = super::surrealkv::Datastore::new(path, enable_versions)
327 .await
328 .map(DatastoreFlavor::SurrealKV);
329 let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
330 info!(target: TARGET, "Started kvs store at {} with versions {}", path, if enable_versions { "enabled" } else { "disabled" });
331 Ok((v, c))
332 }
333 #[cfg(not(feature = "kv-surrealkv"))]
334 return Err(Error::Ds("Cannot connect to the `surrealkv` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
335 }
336 s if s.starts_with("surrealcs:") => {
338 #[cfg(feature = "kv-surrealcs")]
339 {
340 info!(target: TARGET, "Starting kvs store at {}", path);
341 let s = s.trim_start_matches("surrealcs://");
342 let s = s.trim_start_matches("surrealcs:");
343 let v =
344 super::surrealcs::Datastore::new(s).await.map(DatastoreFlavor::SurrealCS);
345 let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
346 info!(target: TARGET, "Started kvs store at {}", path);
347 Ok((v, c))
348 }
349 #[cfg(not(feature = "kv-surrealcs"))]
350 return Err(Error::Ds("Cannot connect to the `surrealcs` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
351 }
352 s if s.starts_with("indxdb:") => {
354 #[cfg(feature = "kv-indxdb")]
355 {
356 info!(target: TARGET, "Starting kvs store at {}", path);
357 let s = s.trim_start_matches("indxdb://");
358 let s = s.trim_start_matches("indxdb:");
359 let v = super::indxdb::Datastore::new(s).await.map(DatastoreFlavor::IndxDB);
360 let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
361 info!(target: TARGET, "Started kvs store at {}", path);
362 Ok((v, c))
363 }
364 #[cfg(not(feature = "kv-indxdb"))]
365 return Err(Error::Ds("Cannot connect to the `indxdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
366 }
367 s if s.starts_with("tikv:") => {
369 #[cfg(feature = "kv-tikv")]
370 {
371 info!(target: TARGET, "Connecting to kvs store at {}", path);
372 let s = s.trim_start_matches("tikv://");
373 let s = s.trim_start_matches("tikv:");
374 let v = super::tikv::Datastore::new(s).await.map(DatastoreFlavor::TiKV);
375 let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
376 info!(target: TARGET, "Connected to kvs store at {}", path);
377 Ok((v, c))
378 }
379 #[cfg(not(feature = "kv-tikv"))]
380 return Err(Error::Ds("Cannot connect to the `tikv` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
381 }
382 s if s.starts_with("fdb:") => {
384 #[cfg(feature = "kv-fdb")]
385 {
386 info!(target: TARGET, "Connecting to kvs store at {}", path);
387 let s = s.trim_start_matches("fdb://");
388 let s = s.trim_start_matches("fdb:");
389 let v = super::fdb::Datastore::new(s).await.map(DatastoreFlavor::FoundationDB);
390 let c = clock.unwrap_or_else(|| Arc::new(SizedClock::system()));
391 info!(target: TARGET, "Connected to kvs store at {}", path);
392 Ok((v, c))
393 }
394 #[cfg(not(feature = "kv-fdb"))]
395 return Err(Error::Ds("Cannot connect to the `foundationdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()));
396 }
397 _ => {
399 info!(target: TARGET, "Unable to load the specified datastore {}", path);
400 Err(Error::Ds("Unable to load the specified datastore".into()))
401 }
402 }?;
403 flavor.map(|flavor| {
405 let tf = TransactionFactory {
406 clock,
407 flavor: Arc::new(flavor),
408 };
409 Self {
410 id: Uuid::new_v4(),
411 transaction_factory: tf.clone(),
412 strict: false,
413 auth_enabled: false,
414 query_timeout: None,
415 transaction_timeout: None,
416 notification_channel: None,
417 capabilities: Arc::new(Capabilities::default()),
418 index_stores: IndexStores::default(),
419 #[cfg(not(target_family = "wasm"))]
420 index_builder: IndexBuilder::new(tf),
421 #[cfg(feature = "jwks")]
422 jwks_cache: Arc::new(RwLock::new(JwksCache::new())),
423 #[cfg(storage)]
424 temporary_directory: None,
425 cache: Arc::new(DatastoreCache::new()),
426 }
427 })
428 }
429
430 #[allow(dead_code)]
433 pub fn restart(self) -> Self {
434 Self {
435 id: self.id,
436 strict: self.strict,
437 auth_enabled: self.auth_enabled,
438 query_timeout: self.query_timeout,
439 transaction_timeout: self.transaction_timeout,
440 capabilities: self.capabilities,
441 notification_channel: self.notification_channel,
442 index_stores: Default::default(),
443 #[cfg(not(target_family = "wasm"))]
444 index_builder: IndexBuilder::new(self.transaction_factory.clone()),
445 #[cfg(feature = "jwks")]
446 jwks_cache: Arc::new(Default::default()),
447 #[cfg(storage)]
448 temporary_directory: self.temporary_directory,
449 transaction_factory: self.transaction_factory,
450 cache: Arc::new(DatastoreCache::new()),
451 }
452 }
453
454 pub fn with_node_id(mut self, id: Uuid) -> Self {
456 self.id = id;
457 self
458 }
459
460 pub fn with_strict_mode(mut self, strict: bool) -> Self {
462 self.strict = strict;
463 self
464 }
465
466 pub fn with_notifications(mut self) -> Self {
468 self.notification_channel = Some(async_channel::bounded(LQ_CHANNEL_SIZE));
469 self
470 }
471
472 pub fn with_query_timeout(mut self, duration: Option<Duration>) -> Self {
474 self.query_timeout = duration;
475 self
476 }
477
478 pub fn with_transaction_timeout(mut self, duration: Option<Duration>) -> Self {
480 self.transaction_timeout = duration;
481 self
482 }
483
484 pub fn with_auth_enabled(mut self, enabled: bool) -> Self {
486 self.auth_enabled = enabled;
487 self
488 }
489
490 pub fn with_capabilities(mut self, caps: Capabilities) -> Self {
492 self.capabilities = Arc::new(caps);
493 self
494 }
495
496 #[cfg(storage)]
497 pub fn with_temporary_directory(mut self, path: Option<PathBuf>) -> Self {
499 self.temporary_directory = path.map(Arc::new);
500 self
501 }
502
503 pub fn index_store(&self) -> &IndexStores {
504 &self.index_stores
505 }
506
507 pub fn is_auth_enabled(&self) -> bool {
509 self.auth_enabled
510 }
511
512 pub fn id(&self) -> Uuid {
513 self.id
514 }
515
516 pub(crate) fn allows_rpc_method(&self, method_target: &MethodTarget) -> bool {
518 self.capabilities.allows_rpc_method(method_target)
519 }
520
521 pub fn allows_http_route(&self, route_target: &RouteTarget) -> bool {
524 self.capabilities.allows_http_route(route_target)
525 }
526
527 pub fn allows_query_by_subject(&self, subject: impl Into<ArbitraryQueryTarget>) -> bool {
530 self.capabilities.allows_query(&subject.into())
531 }
532
533 #[cfg(feature = "jwks")]
535 pub(crate) fn allows_network_target(&self, net_target: &NetTarget) -> bool {
536 self.capabilities.allows_network_target(net_target)
537 }
538
539 pub fn get_capabilities(&self) -> &Capabilities {
541 &self.capabilities
542 }
543
544 #[cfg(feature = "jwks")]
545 pub(crate) fn jwks_cache(&self) -> &Arc<RwLock<JwksCache>> {
546 &self.jwks_cache
547 }
548
549 pub(super) async fn clock_now(&self) -> Timestamp {
550 self.transaction_factory.clock.now().await
551 }
552
553 #[allow(dead_code)]
555 pub fn get_cache(&self) -> Arc<DatastoreCache> {
556 self.cache.clone()
557 }
558
559 #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
561 pub async fn check_version(&self) -> Result<Version, Error> {
562 let version = self.get_version().await?;
563 if !version.is_latest() {
565 return Err(Error::OutdatedStorageVersion);
566 }
567 Ok(version)
569 }
570
571 #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
573 pub async fn get_version(&self) -> Result<Version, Error> {
574 let txn = self.transaction(Write, Pessimistic).await?.enclose();
576 let key = crate::key::version::new();
578 let val = match catch!(txn, txn.get(key.clone(), None).await) {
580 Some(v) => {
582 let val = TryInto::<Version>::try_into(v);
584 match val {
586 Err(err) => {
588 catch!(txn, txn.cancel().await);
590 return Err(err);
592 }
593 Ok(val) => {
595 catch!(txn, txn.cancel().await);
597 val
599 }
600 }
601 }
602 None => {
604 let rng = crate::key::version::proceeding();
606 let keys = catch!(txn, txn.keys(rng, 1, None).await);
607 let val = if keys.is_empty() {
609 Version::latest()
611 } else {
612 Version::v1()
614 };
615 let bytes: Vec<u8> = val.into();
617 catch!(txn, txn.replace(key, bytes).await);
619 catch!(txn, txn.commit().await);
621 val
623 }
624 };
625 Ok(val)
627 }
628
629 #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
631 pub async fn initialise_credentials(&self, user: &str, pass: &str) -> Result<(), Error> {
632 let txn = self.transaction(Write, Optimistic).await?.enclose();
634 let users = catch!(txn, txn.all_root_users().await);
636 if users.is_empty() {
638 info!(target: TARGET, "Credentials were provided, and no root users were found. The root user '{user}' will be created");
640 let stm = DefineUserStatement::from((Base::Root, user, pass, INITIAL_USER_ROLE));
642 let opt = Options::new().with_auth(Arc::new(Auth::for_root(Role::Owner)));
643 let mut ctx = MutableContext::default();
644 ctx.set_transaction(txn.clone());
645 let ctx = ctx.freeze();
646 catch!(txn, stm.compute(&ctx, &opt, None).await);
647 txn.commit().await
649 } else {
650 warn!(target: TARGET, "Credentials were provided, but existing root users were found. The root user '{user}' will not be created");
652 warn!(target: TARGET, "Consider removing the --user and --pass arguments from the server start command");
653 txn.cancel().await
655 }
656 }
657
658 #[instrument(err, level = "trace", target = "surrealdb::core::kvs::ds", skip_all)]
660 pub async fn bootstrap(&self) -> Result<(), Error> {
661 self.insert_node(self.id).await?;
663 self.expire_nodes().await?;
665 self.remove_nodes().await?;
667 Ok(())
669 }
670
671 #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
673 pub async fn node_membership_update(&self) -> Result<(), Error> {
674 trace!(target: TARGET, "Updating node registration information");
676 self.update_node(self.id).await?;
678 Ok(())
680 }
681
682 #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
684 pub async fn node_membership_expire(&self) -> Result<(), Error> {
685 trace!(target: TARGET, "Processing and archiving inactive nodes");
687 self.expire_nodes().await?;
689 Ok(())
691 }
692
693 #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
695 pub async fn node_membership_remove(&self) -> Result<(), Error> {
696 trace!(target: TARGET, "Processing and cleaning archived nodes");
698 self.remove_nodes().await?;
700 Ok(())
702 }
703
704 #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
706 pub async fn changefeed_process(&self) -> Result<(), Error> {
707 trace!(target: TARGET, "Running changefeed garbage collection");
709 let ts = SystemTime::now()
711 .duration_since(UNIX_EPOCH)
712 .map_err(|e| {
713 Error::Internal(format!("Clock may have gone backwards: {:?}", e.duration()))
714 })?
715 .as_secs();
716 self.changefeed_versionstamp(ts).await?;
718 self.changefeed_cleanup(ts).await?;
720 Ok(())
722 }
723
724 #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
726 pub async fn changefeed_process_at(&self, ts: u64) -> Result<(), Error> {
727 trace!(target: TARGET, "Running changefeed garbage collection");
729 self.changefeed_versionstamp(ts).await?;
731 self.changefeed_cleanup(ts).await?;
733 Ok(())
735 }
736
737 #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self))]
739 pub async fn shutdown(&self) -> Result<(), Error> {
740 trace!(target: TARGET, "Running datastore shutdown operations");
742 self.delete_node(self.id).await?;
744 match self.transaction_factory.flavor.as_ref() {
746 #[cfg(feature = "kv-mem")]
747 DatastoreFlavor::Mem(v) => v.shutdown().await,
748 #[cfg(feature = "kv-rocksdb")]
749 DatastoreFlavor::RocksDB(v) => v.shutdown().await,
750 #[cfg(feature = "kv-indxdb")]
751 DatastoreFlavor::IndxDB(v) => v.shutdown().await,
752 #[cfg(feature = "kv-tikv")]
753 DatastoreFlavor::TiKV(v) => v.shutdown().await,
754 #[cfg(feature = "kv-fdb")]
755 DatastoreFlavor::FoundationDB(v) => v.shutdown().await,
756 #[cfg(feature = "kv-surrealkv")]
757 DatastoreFlavor::SurrealKV(v) => v.shutdown().await,
758 #[cfg(feature = "kv-surrealcs")]
759 DatastoreFlavor::SurrealCS(v) => v.shutdown().await,
760 #[allow(unreachable_patterns)]
761 _ => unreachable!(),
762 }
763 }
764
765 #[allow(unreachable_code)]
780 pub async fn transaction(
781 &self,
782 write: TransactionType,
783 lock: LockType,
784 ) -> Result<Transaction, Error> {
785 self.transaction_factory.transaction(write, lock).await
786 }
787
788 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
805 pub async fn execute(
806 &self,
807 txt: &str,
808 sess: &Session,
809 vars: Variables,
810 ) -> Result<Vec<Response>, Error> {
811 let ast = syn::parse_with_capabilities(txt, &self.capabilities)?;
813 self.process(ast, sess, vars).await
815 }
816
817 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
818 pub async fn execute_import<S>(
819 &self,
820 sess: &Session,
821 vars: Variables,
822 query: S,
823 ) -> Result<Vec<Response>, Error>
824 where
825 S: Stream<Item = Result<Bytes, Error>>,
826 {
827 if sess.expired() {
829 return Err(Error::ExpiredSession);
830 }
831
832 self.check_anon(sess).map_err(|_| IamError::NotAllowed {
835 actor: "anonymous".to_string(),
836 action: "process".to_string(),
837 resource: "query".to_string(),
838 })?;
839
840 let opt = self.setup_options(sess);
842
843 let mut ctx = self.setup_ctx()?;
845 sess.context(&mut ctx);
847 vars.attach(&mut ctx)?;
849 let parser_settings = ParserSettings {
852 references_enabled: ctx
853 .get_capabilities()
854 .allows_experimental(&ExperimentalTarget::RecordReferences),
855 bearer_access_enabled: ctx
856 .get_capabilities()
857 .allows_experimental(&ExperimentalTarget::BearerAccess),
858 define_api_enabled: ctx
859 .get_capabilities()
860 .allows_experimental(&ExperimentalTarget::DefineApi),
861 ..Default::default()
862 };
863 let mut statements_stream = StatementStream::new_with_settings(parser_settings);
864 let mut buffer = BytesMut::new();
865 let mut parse_size = 4096;
866 let mut bytes_stream = pin!(query);
867 let mut complete = false;
868 let mut filling = true;
869
870 let stream = futures::stream::poll_fn(move |cx| loop {
871 while filling {
873 let bytes = ready!(bytes_stream.as_mut().poll_next(cx));
874 let bytes = match bytes {
875 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
876 Some(Ok(x)) => x,
877 None => {
878 complete = true;
879 filling = false;
880 break;
881 }
882 };
883
884 buffer.extend_from_slice(&bytes);
885 filling = buffer.len() < parse_size
886 }
887
888 if complete {
891 return match statements_stream.parse_complete(&mut buffer) {
892 Err(e) => Poll::Ready(Some(Err(Error::InvalidQuery(e)))),
893 Ok(None) => Poll::Ready(None),
894 Ok(Some(x)) => Poll::Ready(Some(Ok(x))),
895 };
896 }
897
898 match statements_stream.parse_partial(&mut buffer) {
900 Err(e) => return Poll::Ready(Some(Err(Error::InvalidQuery(e)))),
901 Ok(Some(x)) => return Poll::Ready(Some(Ok(x))),
902 Ok(None) => {
903 if buffer.len() >= parse_size && parse_size < u32::MAX as usize {
905 parse_size = (parse_size + 1).next_power_of_two();
909 }
910 filling = true;
912 }
913 }
914 });
915
916 Executor::execute_stream(self, Arc::new(ctx), opt, stream).await
917 }
918
919 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
937 pub async fn process(
938 &self,
939 ast: Query,
940 sess: &Session,
941 vars: Variables,
942 ) -> Result<Vec<Response>, Error> {
943 if sess.expired() {
945 return Err(Error::ExpiredSession);
946 }
947 self.check_anon(sess).map_err(|_| IamError::NotAllowed {
950 actor: "anonymous".to_string(),
951 action: "process".to_string(),
952 resource: "query".to_string(),
953 })?;
954
955 let opt = self.setup_options(sess);
957
958 let mut ctx = self.setup_ctx()?;
960 sess.context(&mut ctx);
962 vars.attach(&mut ctx)?;
964 Executor::execute(self, ctx.freeze(), opt, ast).await
966 }
967
968 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
987 pub async fn compute(
988 &self,
989 val: Value,
990 sess: &Session,
991 vars: Variables,
992 ) -> Result<Value, Error> {
993 if sess.expired() {
995 return Err(Error::ExpiredSession);
996 }
997 self.check_anon(sess).map_err(|_| IamError::NotAllowed {
1000 actor: "anonymous".to_string(),
1001 action: "compute".to_string(),
1002 resource: "value".to_string(),
1003 })?;
1004
1005 let mut stack = TreeStack::new();
1007 let opt = self.setup_options(sess);
1009 let mut ctx = MutableContext::default();
1011 ctx.add_capabilities(self.capabilities.clone());
1013 if let Some(timeout) = self.query_timeout {
1015 ctx.add_timeout(timeout)?;
1016 }
1017 if let Some(channel) = &self.notification_channel {
1019 ctx.add_notifications(Some(&channel.0));
1020 }
1021 sess.context(&mut ctx);
1023 vars.attach(&mut ctx)?;
1025 let txn = self.transaction(val.writeable().into(), Optimistic).await?.enclose();
1027 ctx.set_transaction(txn.clone());
1029 let ctx = ctx.freeze();
1031 let res = stack.enter(|stk| val.compute(stk, &ctx, &opt, None)).finish().await;
1033 match (res.is_ok(), val.writeable()) {
1035 (true, true) => txn.commit().await?,
1037 (_, _) => txn.cancel().await?,
1039 };
1040 res
1042 }
1043
1044 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1067 pub async fn evaluate(
1068 &self,
1069 val: &Value,
1070 sess: &Session,
1071 vars: Variables,
1072 ) -> Result<Value, Error> {
1073 if sess.expired() {
1075 return Err(Error::ExpiredSession);
1076 }
1077 let mut stack = TreeStack::new();
1079 let opt = self.setup_options(sess);
1081 let mut ctx = MutableContext::default();
1083 ctx.add_capabilities(self.capabilities.clone());
1085 if let Some(timeout) = self.query_timeout {
1087 ctx.add_timeout(timeout)?;
1088 }
1089 if let Some(channel) = &self.notification_channel {
1091 ctx.add_notifications(Some(&channel.0));
1092 }
1093 sess.context(&mut ctx);
1095 vars.attach(&mut ctx)?;
1097 let txn = self.transaction(val.writeable().into(), Optimistic).await?.enclose();
1099 ctx.set_transaction(txn.clone());
1101 let ctx = ctx.freeze();
1103 let res = stack.enter(|stk| val.compute(stk, &ctx, &opt, None)).finish().await;
1105 match (res.is_ok(), val.writeable()) {
1107 (true, true) => txn.commit().await?,
1109 (_, _) => txn.cancel().await?,
1111 };
1112 res
1114 }
1115
1116 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1136 pub fn notifications(&self) -> Option<Receiver<Notification>> {
1137 self.notification_channel.as_ref().map(|v| v.1.clone())
1138 }
1139
1140 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1142 pub async fn import(&self, sql: &str, sess: &Session) -> Result<Vec<Response>, Error> {
1143 if sess.expired() {
1145 return Err(Error::ExpiredSession);
1146 }
1147 self.execute(sql, sess, None).await
1149 }
1150
1151 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1153 pub async fn import_stream<S>(&self, sess: &Session, stream: S) -> Result<Vec<Response>, Error>
1154 where
1155 S: Stream<Item = Result<Bytes, Error>>,
1156 {
1157 if sess.expired() {
1159 return Err(Error::ExpiredSession);
1160 }
1161 self.execute_import(sess, None, stream).await
1163 }
1164
1165 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1167 pub async fn export(
1168 &self,
1169 sess: &Session,
1170 chn: Sender<Vec<u8>>,
1171 ) -> Result<impl Future<Output = Result<(), Error>>, Error> {
1172 let cfg = super::export::Config::default();
1174 self.export_with_config(sess, chn, cfg).await
1175 }
1176
1177 #[instrument(level = "debug", target = "surrealdb::core::kvs::ds", skip_all)]
1179 pub async fn export_with_config(
1180 &self,
1181 sess: &Session,
1182 chn: Sender<Vec<u8>>,
1183 cfg: export::Config,
1184 ) -> Result<impl Future<Output = Result<(), Error>>, Error> {
1185 if sess.expired() {
1187 return Err(Error::ExpiredSession);
1188 }
1189 let (ns, db) = crate::iam::check::check_ns_db(sess)?;
1191 let txn = self.transaction(Read, Optimistic).await?;
1193 Ok(async move {
1195 txn.export(&ns, &db, cfg, chn).await?;
1197 Ok(())
1199 })
1200 }
1201
1202 #[instrument(level = "trace", target = "surrealdb::core::kvs::ds", skip(self, sess))]
1204 pub fn check(&self, sess: &Session, action: Action, resource: Resource) -> Result<(), Error> {
1205 if sess.expired() {
1207 return Err(Error::ExpiredSession);
1208 }
1209 let skip_auth = !self.is_auth_enabled() && sess.au.is_anon();
1211 if !skip_auth {
1212 sess.au.is_allowed(action, &resource)?;
1213 }
1214 Ok(())
1216 }
1217
1218 pub fn setup_options(&self, sess: &Session) -> Options {
1219 Options::default()
1220 .with_id(self.id)
1221 .with_ns(sess.ns())
1222 .with_db(sess.db())
1223 .with_live(sess.live())
1224 .with_auth(sess.au.clone())
1225 .with_strict(self.strict)
1226 .with_auth_enabled(self.auth_enabled)
1227 }
1228 pub fn setup_ctx(&self) -> Result<MutableContext, Error> {
1229 let mut ctx = MutableContext::from_ds(
1230 self.query_timeout,
1231 self.capabilities.clone(),
1232 self.index_stores.clone(),
1233 self.cache.clone(),
1234 #[cfg(not(target_family = "wasm"))]
1235 self.index_builder.clone(),
1236 #[cfg(storage)]
1237 self.temporary_directory.clone(),
1238 )?;
1239 if let Some(channel) = &self.notification_channel {
1241 ctx.add_notifications(Some(&channel.0));
1242 }
1243 Ok(ctx)
1244 }
1245
1246 pub fn check_anon(&self, sess: &Session) -> Result<(), IamError> {
1248 if self.auth_enabled && sess.au.is_anon() && !self.capabilities.allows_guest_access() {
1249 Err(IamError::NotAllowed {
1250 actor: "anonymous".to_string(),
1251 action: String::new(),
1252 resource: String::new(),
1253 })
1254 } else {
1255 Ok(())
1256 }
1257 }
1258}
1259
1260#[cfg(test)]
1261mod test {
1262 use super::*;
1263
1264 #[tokio::test]
1265 pub async fn very_deep_query() -> Result<(), Error> {
1266 use crate::kvs::Datastore;
1267 use crate::sql::{Expression, Future, Number, Operator, Value};
1268 use reblessive::{Stack, Stk};
1269
1270 let mut stack = Stack::new();
1272 async fn build_query(stk: &mut Stk, depth: usize) -> Value {
1273 if depth == 0 {
1274 Value::Expression(Box::new(Expression::Binary {
1275 l: Value::Number(Number::Int(1)),
1276 o: Operator::Add,
1277 r: Value::Number(Number::Int(1)),
1278 }))
1279 } else {
1280 let q = stk.run(|stk| build_query(stk, depth - 1)).await;
1281 Value::Future(Box::new(Future::from(q)))
1282 }
1283 }
1284 let val = stack.enter(|stk| build_query(stk, 1000)).finish();
1285
1286 let dbs = Datastore::new("memory").await.unwrap().with_capabilities(Capabilities::all());
1287
1288 let opt = Options::default()
1289 .with_id(dbs.id)
1290 .with_ns(Some("test".into()))
1291 .with_db(Some("test".into()))
1292 .with_live(false)
1293 .with_strict(false)
1294 .with_auth_enabled(false)
1295 .with_max_computation_depth(u32::MAX)
1296 .with_futures(true);
1297
1298 let mut ctx = MutableContext::default();
1300 ctx.add_capabilities(dbs.capabilities.clone());
1302 let txn = dbs.transaction(val.writeable().into(), Optimistic).await?;
1304 ctx.set_transaction(txn.enclose());
1306 let ctx = ctx.freeze();
1308 let mut stack = reblessive::tree::TreeStack::new();
1310 let res = stack.enter(|stk| val.compute(stk, &ctx, &opt, None)).finish().await.unwrap();
1311 assert_eq!(res, Value::Number(Number::Int(2)));
1312 Ok(())
1313 }
1314}