surrealdb_core/ctx/
context.rs

1use crate::cnf::PROTECTED_PARAM_NAMES;
2use crate::ctx::canceller::Canceller;
3use crate::ctx::reason::Reason;
4#[cfg(feature = "http")]
5use crate::dbs::capabilities::NetTarget;
6use crate::dbs::{Capabilities, Notification};
7use crate::err::Error;
8use crate::idx::planner::executor::QueryExecutor;
9use crate::idx::planner::{IterationStage, QueryPlanner};
10use crate::idx::trees::store::IndexStores;
11use crate::kvs::cache::ds::DatastoreCache;
12#[cfg(not(target_family = "wasm"))]
13use crate::kvs::IndexBuilder;
14use crate::kvs::Transaction;
15use crate::mem::ALLOC;
16use crate::sql::value::Value;
17use async_channel::Sender;
18use std::borrow::Cow;
19use std::collections::HashMap;
20use std::fmt::{self, Debug};
21#[cfg(storage)]
22use std::path::PathBuf;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::Arc;
25use std::time::Duration;
26use trice::Instant;
27#[cfg(feature = "http")]
28use url::Url;
29
30pub type Context = Arc<MutableContext>;
31
32#[non_exhaustive]
33pub struct MutableContext {
34	// An optional parent context.
35	parent: Option<Context>,
36	// An optional deadline.
37	deadline: Option<Instant>,
38	// Whether or not this context is cancelled.
39	cancelled: Arc<AtomicBool>,
40	// A collection of read only values stored in this context.
41	values: HashMap<Cow<'static, str>, Arc<Value>>,
42	// Stores the notification channel if available
43	notifications: Option<Sender<Notification>>,
44	// An optional query planner
45	query_planner: Option<Arc<QueryPlanner>>,
46	// An optional query executor
47	query_executor: Option<QueryExecutor>,
48	// An optional iteration stage
49	iteration_stage: Option<IterationStage>,
50	// An optional datastore cache
51	cache: Option<Arc<DatastoreCache>>,
52	// The index store
53	index_stores: IndexStores,
54	// The index concurrent builders
55	#[cfg(not(target_family = "wasm"))]
56	index_builder: Option<IndexBuilder>,
57	// Capabilities
58	capabilities: Arc<Capabilities>,
59	#[cfg(storage)]
60	// The temporary directory
61	temporary_directory: Option<Arc<PathBuf>>,
62	// An optional transaction
63	transaction: Option<Arc<Transaction>>,
64	// Does not read from parent `values`.
65	isolated: bool,
66}
67
68impl Default for MutableContext {
69	fn default() -> Self {
70		MutableContext::background()
71	}
72}
73
74impl From<Transaction> for MutableContext {
75	fn from(txn: Transaction) -> Self {
76		let mut ctx = MutableContext::background();
77		ctx.set_transaction(Arc::new(txn));
78		ctx
79	}
80}
81
82impl Debug for MutableContext {
83	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
84		f.debug_struct("Context")
85			.field("parent", &self.parent)
86			.field("deadline", &self.deadline)
87			.field("cancelled", &self.cancelled)
88			.field("values", &self.values)
89			.finish()
90	}
91}
92
93impl MutableContext {
94	/// Creates a new empty background context.
95	pub(crate) fn background() -> Self {
96		Self {
97			values: HashMap::default(),
98			parent: None,
99			deadline: None,
100			cancelled: Arc::new(AtomicBool::new(false)),
101			notifications: None,
102			query_planner: None,
103			query_executor: None,
104			iteration_stage: None,
105			capabilities: Arc::new(Capabilities::default()),
106			index_stores: IndexStores::default(),
107			cache: None,
108			#[cfg(not(target_family = "wasm"))]
109			index_builder: None,
110			#[cfg(storage)]
111			temporary_directory: None,
112			transaction: None,
113			isolated: false,
114		}
115	}
116
117	/// Creates a new context from a frozen parent context.
118	pub(crate) fn new(parent: &Context) -> Self {
119		MutableContext {
120			values: HashMap::default(),
121			deadline: parent.deadline,
122			cancelled: Arc::new(AtomicBool::new(false)),
123			notifications: parent.notifications.clone(),
124			query_planner: parent.query_planner.clone(),
125			query_executor: parent.query_executor.clone(),
126			iteration_stage: parent.iteration_stage.clone(),
127			capabilities: parent.capabilities.clone(),
128			index_stores: parent.index_stores.clone(),
129			cache: parent.cache.clone(),
130			#[cfg(not(target_family = "wasm"))]
131			index_builder: parent.index_builder.clone(),
132			#[cfg(storage)]
133			temporary_directory: parent.temporary_directory.clone(),
134			transaction: parent.transaction.clone(),
135			isolated: false,
136			parent: Some(parent.clone()),
137		}
138	}
139
140	/// Create a new context from a frozen parent context.
141	/// This context is isolated, and values specified on
142	/// any parent contexts will not be accessible.
143	pub(crate) fn new_isolated(parent: &Context) -> Self {
144		Self {
145			values: HashMap::default(),
146			deadline: parent.deadline,
147			cancelled: Arc::new(AtomicBool::new(false)),
148			notifications: parent.notifications.clone(),
149			query_planner: parent.query_planner.clone(),
150			query_executor: parent.query_executor.clone(),
151			iteration_stage: parent.iteration_stage.clone(),
152			capabilities: parent.capabilities.clone(),
153			index_stores: parent.index_stores.clone(),
154			cache: parent.cache.clone(),
155			#[cfg(not(target_family = "wasm"))]
156			index_builder: parent.index_builder.clone(),
157			#[cfg(storage)]
158			temporary_directory: parent.temporary_directory.clone(),
159			transaction: parent.transaction.clone(),
160			isolated: true,
161			parent: Some(parent.clone()),
162		}
163	}
164
165	/// Create a new context from a frozen parent context.
166	/// This context is not linked to the parent context,
167	/// and won't be cancelled if the parent is cancelled.
168	#[cfg(not(target_family = "wasm"))]
169	pub(crate) fn new_concurrent(from: &Context) -> Self {
170		Self {
171			values: HashMap::default(),
172			deadline: None,
173			cancelled: Arc::new(AtomicBool::new(false)),
174			notifications: from.notifications.clone(),
175			query_planner: from.query_planner.clone(),
176			query_executor: from.query_executor.clone(),
177			iteration_stage: from.iteration_stage.clone(),
178			capabilities: from.capabilities.clone(),
179			index_stores: from.index_stores.clone(),
180			cache: from.cache.clone(),
181			index_builder: from.index_builder.clone(),
182			#[cfg(storage)]
183			temporary_directory: from.temporary_directory.clone(),
184			transaction: None,
185			isolated: false,
186			parent: None,
187		}
188	}
189
190	/// Creates a new context from a configured datastore.
191	pub(crate) fn from_ds(
192		time_out: Option<Duration>,
193		capabilities: Arc<Capabilities>,
194		index_stores: IndexStores,
195		cache: Arc<DatastoreCache>,
196		#[cfg(not(target_family = "wasm"))] index_builder: IndexBuilder,
197		#[cfg(storage)] temporary_directory: Option<Arc<PathBuf>>,
198	) -> Result<MutableContext, Error> {
199		let mut ctx = Self {
200			values: HashMap::default(),
201			parent: None,
202			deadline: None,
203			cancelled: Arc::new(AtomicBool::new(false)),
204			notifications: None,
205			query_planner: None,
206			query_executor: None,
207			iteration_stage: None,
208			capabilities,
209			index_stores,
210			cache: Some(cache),
211			#[cfg(not(target_family = "wasm"))]
212			index_builder: Some(index_builder),
213			#[cfg(storage)]
214			temporary_directory,
215			transaction: None,
216			isolated: false,
217		};
218		if let Some(timeout) = time_out {
219			ctx.add_timeout(timeout)?;
220		}
221		Ok(ctx)
222	}
223
224	/// Freezes this context, allowing it to be used as a parent context.
225	pub(crate) fn freeze(self) -> Context {
226		Arc::new(self)
227	}
228
229	/// Unfreezes this context, allowing it to be edited and configured.
230	pub(crate) fn unfreeze(ctx: Context) -> Result<MutableContext, Error> {
231		Arc::into_inner(ctx)
232			.ok_or_else(|| fail!("Tried to unfreeze a Context with multiple references"))
233	}
234
235	/// Add a value to the context. It overwrites any previously set values
236	/// with the same key.
237	pub(crate) fn add_value<K>(&mut self, key: K, value: Arc<Value>)
238	where
239		K: Into<Cow<'static, str>>,
240	{
241		self.values.insert(key.into(), value);
242	}
243
244	/// Add a value to the context. It overwrites any previously set values
245	/// with the same key.
246	pub(crate) fn add_values<T, K, V>(&mut self, iter: T)
247	where
248		T: IntoIterator<Item = (K, V)>,
249		K: Into<Cow<'static, str>>,
250		V: Into<Arc<Value>>,
251	{
252		self.values.extend(iter.into_iter().map(|(k, v)| (k.into(), v.into())))
253	}
254
255	/// Add cancellation to the context. The value that is returned will cancel
256	/// the context and it's children once called.
257	pub(crate) fn add_cancel(&mut self) -> Canceller {
258		let cancelled = self.cancelled.clone();
259		Canceller::new(cancelled)
260	}
261
262	/// Add a deadline to the context. If the current deadline is sooner than
263	/// the provided deadline, this method does nothing.
264	pub(crate) fn add_deadline(&mut self, deadline: Instant) {
265		match self.deadline {
266			Some(current) if current < deadline => (),
267			_ => self.deadline = Some(deadline),
268		}
269	}
270
271	/// Add a timeout to the context. If the current timeout is sooner than
272	/// the provided timeout, this method does nothing. If the result of the
273	/// addition causes an overflow, this method returns an error.
274	pub(crate) fn add_timeout(&mut self, timeout: Duration) -> Result<(), Error> {
275		match Instant::now().checked_add(timeout) {
276			Some(deadline) => {
277				self.add_deadline(deadline);
278				Ok(())
279			}
280			None => Err(Error::InvalidTimeout(timeout.as_secs())),
281		}
282	}
283
284	/// Add the LIVE query notification channel to the context, so that we
285	/// can send notifications to any subscribers.
286	pub(crate) fn add_notifications(&mut self, chn: Option<&Sender<Notification>>) {
287		self.notifications = chn.cloned()
288	}
289
290	pub(crate) fn set_query_planner(&mut self, qp: QueryPlanner) {
291		self.query_planner = Some(Arc::new(qp));
292	}
293
294	pub(crate) fn set_query_executor(&mut self, qe: QueryExecutor) {
295		self.query_executor = Some(qe);
296	}
297
298	pub(crate) fn set_iteration_stage(&mut self, is: IterationStage) {
299		self.iteration_stage = Some(is);
300	}
301
302	pub(crate) fn set_transaction(&mut self, txn: Arc<Transaction>) {
303		self.transaction = Some(txn);
304	}
305
306	pub(crate) fn tx(&self) -> Arc<Transaction> {
307		self.transaction
308			.clone()
309			.unwrap_or_else(|| unreachable!("The context was not associated with a transaction"))
310	}
311
312	/// Get the timeout for this operation, if any. This is useful for
313	/// checking if a long job should be started or not.
314	pub(crate) fn timeout(&self) -> Option<Duration> {
315		self.deadline.map(|v| v.saturating_duration_since(Instant::now()))
316	}
317
318	pub(crate) fn notifications(&self) -> Option<Sender<Notification>> {
319		self.notifications.clone()
320	}
321
322	pub(crate) fn has_notifications(&self) -> bool {
323		self.notifications.is_some()
324	}
325
326	pub(crate) fn get_query_planner(&self) -> Option<&QueryPlanner> {
327		self.query_planner.as_ref().map(|qp| qp.as_ref())
328	}
329
330	pub(crate) fn get_query_executor(&self) -> Option<&QueryExecutor> {
331		self.query_executor.as_ref()
332	}
333
334	pub(crate) fn get_iteration_stage(&self) -> Option<&IterationStage> {
335		self.iteration_stage.as_ref()
336	}
337
338	/// Get the index_store for this context/ds
339	pub(crate) fn get_index_stores(&self) -> &IndexStores {
340		&self.index_stores
341	}
342
343	/// Get the index_builder for this context/ds
344	#[cfg(not(target_family = "wasm"))]
345	pub(crate) fn get_index_builder(&self) -> Option<&IndexBuilder> {
346		self.index_builder.as_ref()
347	}
348
349	// Get the current datastore cache
350	pub(crate) fn get_cache(&self) -> Option<Arc<DatastoreCache>> {
351		self.cache.clone()
352	}
353
354	/// Check if the context is done. If it returns `None` the operation may
355	/// proceed, otherwise the operation should be stopped.
356	/// Note regarding `deep_check`:
357	/// Checking Instant::now() takes tens to hundreds of nanoseconds
358	/// Checking an AtomicBool takes a single-digit nanoseconds.
359	/// We may not want to check for the deadline on every call.
360	/// An iteration loop may want to check it every 10 or 100 calls.
361	/// Eg.: ctx.done(count % 100 == 0)
362	pub(crate) fn done(&self, deep_check: bool) -> Result<Option<Reason>, Error> {
363		match self.deadline {
364			Some(deadline) if deep_check && deadline <= Instant::now() => {
365				Ok(Some(Reason::Timedout))
366			}
367			_ if self.cancelled.load(Ordering::Relaxed) => Ok(Some(Reason::Canceled)),
368			_ => {
369				if deep_check && ALLOC.is_beyond_threshold() {
370					return Err(Error::QueryBeyondMemoryThreshold);
371				}
372				match &self.parent {
373					Some(ctx) => ctx.done(deep_check),
374					_ => Ok(None),
375				}
376			}
377		}
378	}
379
380	/// Check if the context is ok to continue.
381	pub(crate) fn is_ok(&self, deep_check: bool) -> Result<bool, Error> {
382		Ok(self.done(deep_check)?.is_none())
383	}
384
385	/// Check if the context is not ok to continue.
386	pub(crate) fn is_done(&self, deep_check: bool) -> Result<bool, Error> {
387		Ok(self.done(deep_check)?.is_some())
388	}
389
390	/// Check if the context is not ok to continue, because it timed out.
391	pub(crate) fn is_timedout(&self) -> Result<bool, Error> {
392		Ok(matches!(self.done(true)?, Some(Reason::Timedout)))
393	}
394
395	#[cfg(storage)]
396	/// Return the location of the temporary directory if any
397	pub(crate) fn temporary_directory(&self) -> Option<&Arc<PathBuf>> {
398		self.temporary_directory.as_ref()
399	}
400
401	/// Get a value from the context. If no value is stored under the
402	/// provided key, then this will return None.
403	pub(crate) fn value(&self, key: &str) -> Option<&Value> {
404		match self.values.get(key) {
405			Some(v) => Some(v.as_ref()),
406			None if PROTECTED_PARAM_NAMES.contains(&key) || !self.isolated => match &self.parent {
407				Some(p) => p.value(key),
408				_ => None,
409			},
410			None => None,
411		}
412	}
413
414	/// Get a 'static view into the cancellation status.
415	#[cfg(feature = "scripting")]
416	pub(crate) fn cancellation(&self) -> crate::ctx::cancellation::Cancellation {
417		crate::ctx::cancellation::Cancellation::new(
418			self.deadline,
419			std::iter::successors(Some(self), |ctx| ctx.parent.as_ref().map(|c| c.as_ref()))
420				.map(|ctx| ctx.cancelled.clone())
421				.collect(),
422		)
423	}
424
425	//
426	// Capabilities
427	//
428
429	/// Set the capabilities for this context
430	pub(crate) fn add_capabilities(&mut self, caps: Arc<Capabilities>) {
431		self.capabilities = caps;
432	}
433
434	/// Get the capabilities for this context
435	#[allow(dead_code)]
436	pub(crate) fn get_capabilities(&self) -> Arc<Capabilities> {
437		self.capabilities.clone()
438	}
439
440	/// Check if scripting is allowed
441	#[allow(dead_code)]
442	pub(crate) fn check_allowed_scripting(&self) -> Result<(), Error> {
443		if !self.capabilities.allows_scripting() {
444			warn!("Capabilities denied scripting attempt");
445			return Err(Error::ScriptingNotAllowed);
446		}
447		trace!("Capabilities allowed scripting");
448		Ok(())
449	}
450
451	/// Check if a function is allowed
452	pub(crate) fn check_allowed_function(&self, target: &str) -> Result<(), Error> {
453		if !self.capabilities.allows_function_name(target) {
454			warn!("Capabilities denied function execution attempt, target: '{target}'");
455			return Err(Error::FunctionNotAllowed(target.to_string()));
456		}
457		trace!("Capabilities allowed function execution, target: '{target}'");
458		Ok(())
459	}
460
461	/// Check if a network target is allowed
462	#[cfg(feature = "http")]
463	pub(crate) fn check_allowed_net(&self, url: &Url) -> Result<(), Error> {
464		match url.host() {
465			Some(host) => {
466				let target = &NetTarget::Host(host.to_owned(), url.port_or_known_default());
467				if !self.capabilities.allows_network_target(target) {
468					warn!(
469						"Capabilities denied outgoing network connection attempt, target: '{target}'"
470					);
471					return Err(Error::NetTargetNotAllowed(target.to_string()));
472				}
473				trace!("Capabilities allowed outgoing network connection, target: '{target}'");
474				Ok(())
475			}
476			_ => Err(Error::InvalidUrl(url.to_string())),
477		}
478	}
479}