surrealdb_core/ctx/
context.rs

1use crate::cnf::PROTECTED_PARAM_NAMES;
2use crate::ctx::canceller::Canceller;
3use crate::ctx::reason::Reason;
4#[cfg(feature = "http")]
5use crate::dbs::capabilities::NetTarget;
6use crate::dbs::{Capabilities, Notification};
7use crate::err::Error;
8use crate::idx::planner::executor::QueryExecutor;
9use crate::idx::planner::{IterationStage, QueryPlanner};
10use crate::idx::trees::store::IndexStores;
11use crate::kvs::cache::ds::DatastoreCache;
12#[cfg(not(target_family = "wasm"))]
13use crate::kvs::IndexBuilder;
14use crate::kvs::Transaction;
15use crate::sql::value::Value;
16use async_channel::Sender;
17use std::borrow::Cow;
18use std::collections::HashMap;
19use std::fmt::{self, Debug};
20#[cfg(storage)]
21use std::path::PathBuf;
22use std::sync::atomic::{AtomicBool, Ordering};
23use std::sync::Arc;
24use std::time::Duration;
25use trice::Instant;
26#[cfg(feature = "http")]
27use url::Url;
28
29pub type Context = Arc<MutableContext>;
30
31#[non_exhaustive]
32pub struct MutableContext {
33	// An optional parent context.
34	parent: Option<Context>,
35	// An optional deadline.
36	deadline: Option<Instant>,
37	// Whether or not this context is cancelled.
38	cancelled: Arc<AtomicBool>,
39	// A collection of read only values stored in this context.
40	values: HashMap<Cow<'static, str>, Arc<Value>>,
41	// Stores the notification channel if available
42	notifications: Option<Sender<Notification>>,
43	// An optional query planner
44	query_planner: Option<Arc<QueryPlanner>>,
45	// An optional query executor
46	query_executor: Option<QueryExecutor>,
47	// An optional iteration stage
48	iteration_stage: Option<IterationStage>,
49	// An optional datastore cache
50	cache: Option<Arc<DatastoreCache>>,
51	// The index store
52	index_stores: IndexStores,
53	// The index concurrent builders
54	#[cfg(not(target_family = "wasm"))]
55	index_builder: Option<IndexBuilder>,
56	// Capabilities
57	capabilities: Arc<Capabilities>,
58	#[cfg(storage)]
59	// The temporary directory
60	temporary_directory: Option<Arc<PathBuf>>,
61	// An optional transaction
62	transaction: Option<Arc<Transaction>>,
63	// Does not read from parent `values`.
64	isolated: bool,
65}
66
67impl Default for MutableContext {
68	fn default() -> Self {
69		MutableContext::background()
70	}
71}
72
73impl From<Transaction> for MutableContext {
74	fn from(txn: Transaction) -> Self {
75		let mut ctx = MutableContext::background();
76		ctx.set_transaction(Arc::new(txn));
77		ctx
78	}
79}
80
81impl Debug for MutableContext {
82	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83		f.debug_struct("Context")
84			.field("parent", &self.parent)
85			.field("deadline", &self.deadline)
86			.field("cancelled", &self.cancelled)
87			.field("values", &self.values)
88			.finish()
89	}
90}
91
92impl MutableContext {
93	/// Creates a new empty background context.
94	pub(crate) fn background() -> Self {
95		Self {
96			values: HashMap::default(),
97			parent: None,
98			deadline: None,
99			cancelled: Arc::new(AtomicBool::new(false)),
100			notifications: None,
101			query_planner: None,
102			query_executor: None,
103			iteration_stage: None,
104			capabilities: Arc::new(Capabilities::default()),
105			index_stores: IndexStores::default(),
106			cache: None,
107			#[cfg(not(target_family = "wasm"))]
108			index_builder: None,
109			#[cfg(storage)]
110			temporary_directory: None,
111			transaction: None,
112			isolated: false,
113		}
114	}
115
116	/// Creates a new context from a frozen parent context.
117	pub(crate) fn new(parent: &Context) -> Self {
118		MutableContext {
119			values: HashMap::default(),
120			deadline: parent.deadline,
121			cancelled: Arc::new(AtomicBool::new(false)),
122			notifications: parent.notifications.clone(),
123			query_planner: parent.query_planner.clone(),
124			query_executor: parent.query_executor.clone(),
125			iteration_stage: parent.iteration_stage.clone(),
126			capabilities: parent.capabilities.clone(),
127			index_stores: parent.index_stores.clone(),
128			cache: parent.cache.clone(),
129			#[cfg(not(target_family = "wasm"))]
130			index_builder: parent.index_builder.clone(),
131			#[cfg(storage)]
132			temporary_directory: parent.temporary_directory.clone(),
133			transaction: parent.transaction.clone(),
134			isolated: false,
135			parent: Some(parent.clone()),
136		}
137	}
138
139	/// Create a new context from a frozen parent context.
140	/// This context is isolated, and values specified on
141	/// any parent contexts will not be accessible.
142	pub(crate) fn new_isolated(parent: &Context) -> Self {
143		Self {
144			values: HashMap::default(),
145			deadline: parent.deadline,
146			cancelled: Arc::new(AtomicBool::new(false)),
147			notifications: parent.notifications.clone(),
148			query_planner: parent.query_planner.clone(),
149			query_executor: parent.query_executor.clone(),
150			iteration_stage: parent.iteration_stage.clone(),
151			capabilities: parent.capabilities.clone(),
152			index_stores: parent.index_stores.clone(),
153			cache: parent.cache.clone(),
154			#[cfg(not(target_family = "wasm"))]
155			index_builder: parent.index_builder.clone(),
156			#[cfg(storage)]
157			temporary_directory: parent.temporary_directory.clone(),
158			transaction: parent.transaction.clone(),
159			isolated: true,
160			parent: Some(parent.clone()),
161		}
162	}
163
164	/// Create a new context from a frozen parent context.
165	/// This context is not linked to the parent context,
166	/// and won't be cancelled if the parent is cancelled.
167	#[cfg(not(target_family = "wasm"))]
168	pub(crate) fn new_concurrent(from: &Context) -> Self {
169		Self {
170			values: HashMap::default(),
171			deadline: None,
172			cancelled: Arc::new(AtomicBool::new(false)),
173			notifications: from.notifications.clone(),
174			query_planner: from.query_planner.clone(),
175			query_executor: from.query_executor.clone(),
176			iteration_stage: from.iteration_stage.clone(),
177			capabilities: from.capabilities.clone(),
178			index_stores: from.index_stores.clone(),
179			cache: from.cache.clone(),
180			index_builder: from.index_builder.clone(),
181			#[cfg(storage)]
182			temporary_directory: from.temporary_directory.clone(),
183			transaction: None,
184			isolated: false,
185			parent: None,
186		}
187	}
188
189	/// Creates a new context from a configured datastore.
190	pub(crate) fn from_ds(
191		time_out: Option<Duration>,
192		capabilities: Arc<Capabilities>,
193		index_stores: IndexStores,
194		cache: Arc<DatastoreCache>,
195		#[cfg(not(target_family = "wasm"))] index_builder: IndexBuilder,
196		#[cfg(storage)] temporary_directory: Option<Arc<PathBuf>>,
197	) -> Result<MutableContext, Error> {
198		let mut ctx = Self {
199			values: HashMap::default(),
200			parent: None,
201			deadline: None,
202			cancelled: Arc::new(AtomicBool::new(false)),
203			notifications: None,
204			query_planner: None,
205			query_executor: None,
206			iteration_stage: None,
207			capabilities,
208			index_stores,
209			cache: Some(cache),
210			#[cfg(not(target_family = "wasm"))]
211			index_builder: Some(index_builder),
212			#[cfg(storage)]
213			temporary_directory,
214			transaction: None,
215			isolated: false,
216		};
217		if let Some(timeout) = time_out {
218			ctx.add_timeout(timeout)?;
219		}
220		Ok(ctx)
221	}
222
223	/// Freezes this context, allowing it to be used as a parent context.
224	pub(crate) fn freeze(self) -> Context {
225		Arc::new(self)
226	}
227
228	/// Unfreezes this context, allowing it to be edited and configured.
229	pub(crate) fn unfreeze(ctx: Context) -> Result<MutableContext, Error> {
230		Arc::into_inner(ctx)
231			.ok_or_else(|| fail!("Tried to unfreeze a Context with multiple references"))
232	}
233
234	/// Add a value to the context. It overwrites any previously set values
235	/// with the same key.
236	pub(crate) fn add_value<K>(&mut self, key: K, value: Arc<Value>)
237	where
238		K: Into<Cow<'static, str>>,
239	{
240		self.values.insert(key.into(), value);
241	}
242
243	/// Add a value to the context. It overwrites any previously set values
244	/// with the same key.
245	pub(crate) fn add_values<T, K, V>(&mut self, iter: T)
246	where
247		T: IntoIterator<Item = (K, V)>,
248		K: Into<Cow<'static, str>>,
249		V: Into<Arc<Value>>,
250	{
251		self.values.extend(iter.into_iter().map(|(k, v)| (k.into(), v.into())))
252	}
253
254	/// Add cancellation to the context. The value that is returned will cancel
255	/// the context and it's children once called.
256	pub(crate) fn add_cancel(&mut self) -> Canceller {
257		let cancelled = self.cancelled.clone();
258		Canceller::new(cancelled)
259	}
260
261	/// Add a deadline to the context. If the current deadline is sooner than
262	/// the provided deadline, this method does nothing.
263	pub(crate) fn add_deadline(&mut self, deadline: Instant) {
264		match self.deadline {
265			Some(current) if current < deadline => (),
266			_ => self.deadline = Some(deadline),
267		}
268	}
269
270	/// Add a timeout to the context. If the current timeout is sooner than
271	/// the provided timeout, this method does nothing. If the result of the
272	/// addition causes an overflow, this method returns an error.
273	pub(crate) fn add_timeout(&mut self, timeout: Duration) -> Result<(), Error> {
274		match Instant::now().checked_add(timeout) {
275			Some(deadline) => {
276				self.add_deadline(deadline);
277				Ok(())
278			}
279			None => Err(Error::InvalidTimeout(timeout.as_secs())),
280		}
281	}
282
283	/// Add the LIVE query notification channel to the context, so that we
284	/// can send notifications to any subscribers.
285	pub(crate) fn add_notifications(&mut self, chn: Option<&Sender<Notification>>) {
286		self.notifications = chn.cloned()
287	}
288
289	pub(crate) fn set_query_planner(&mut self, qp: QueryPlanner) {
290		self.query_planner = Some(Arc::new(qp));
291	}
292
293	pub(crate) fn set_query_executor(&mut self, qe: QueryExecutor) {
294		self.query_executor = Some(qe);
295	}
296
297	pub(crate) fn set_iteration_stage(&mut self, is: IterationStage) {
298		self.iteration_stage = Some(is);
299	}
300
301	pub(crate) fn set_transaction(&mut self, txn: Arc<Transaction>) {
302		self.transaction = Some(txn);
303	}
304
305	pub(crate) fn tx(&self) -> Arc<Transaction> {
306		self.transaction
307			.clone()
308			.unwrap_or_else(|| unreachable!("The context was not associated with a transaction"))
309	}
310
311	/// Get the timeout for this operation, if any. This is useful for
312	/// checking if a long job should be started or not.
313	pub(crate) fn timeout(&self) -> Option<Duration> {
314		self.deadline.map(|v| v.saturating_duration_since(Instant::now()))
315	}
316
317	pub(crate) fn notifications(&self) -> Option<Sender<Notification>> {
318		self.notifications.clone()
319	}
320
321	pub(crate) fn has_notifications(&self) -> bool {
322		self.notifications.is_some()
323	}
324
325	pub(crate) fn get_query_planner(&self) -> Option<&QueryPlanner> {
326		self.query_planner.as_ref().map(|qp| qp.as_ref())
327	}
328
329	pub(crate) fn get_query_executor(&self) -> Option<&QueryExecutor> {
330		self.query_executor.as_ref()
331	}
332
333	pub(crate) fn get_iteration_stage(&self) -> Option<&IterationStage> {
334		self.iteration_stage.as_ref()
335	}
336
337	/// Get the index_store for this context/ds
338	pub(crate) fn get_index_stores(&self) -> &IndexStores {
339		&self.index_stores
340	}
341
342	/// Get the index_builder for this context/ds
343	#[cfg(not(target_family = "wasm"))]
344	pub(crate) fn get_index_builder(&self) -> Option<&IndexBuilder> {
345		self.index_builder.as_ref()
346	}
347
348	// Get the current datastore cache
349	pub(crate) fn get_cache(&self) -> Option<Arc<DatastoreCache>> {
350		self.cache.clone()
351	}
352
353	/// Check if the context is done. If it returns `None` the operation may
354	/// proceed, otherwise the operation should be stopped.
355	/// Note regarding `check_deadline`:
356	/// Checking Instant::now() takes tens to hundreds of nanoseconds
357	/// Checking an AtomicBool takes a single-digit nanoseconds.
358	/// We may not want to check for the deadline on every call.
359	/// An iteration loop may want to check it every 10 or 100 calls. Eg.:
360	/// ctx.done(count % 100 == 0)
361	pub(crate) fn done(&self, check_deadline: bool) -> Option<Reason> {
362		match self.deadline {
363			Some(deadline) if check_deadline && deadline <= Instant::now() => {
364				Some(Reason::Timedout)
365			}
366			_ if self.cancelled.load(Ordering::Relaxed) => Some(Reason::Canceled),
367			_ => match &self.parent {
368				Some(ctx) => ctx.done(check_deadline),
369				_ => None,
370			},
371		}
372	}
373
374	/// Check if the context is ok to continue.
375	pub(crate) fn is_ok(&self, check_deadline: bool) -> bool {
376		self.done(check_deadline).is_none()
377	}
378
379	/// Check if the context is not ok to continue.
380	pub(crate) fn is_done(&self, check_deadline: bool) -> bool {
381		self.done(check_deadline).is_some()
382	}
383
384	/// Check if the context is not ok to continue, because it timed out.
385	pub(crate) fn is_timedout(&self) -> bool {
386		matches!(self.done(true), Some(Reason::Timedout))
387	}
388
389	#[cfg(storage)]
390	/// Return the location of the temporary directory if any
391	pub(crate) fn temporary_directory(&self) -> Option<&Arc<PathBuf>> {
392		self.temporary_directory.as_ref()
393	}
394
395	/// Get a value from the context. If no value is stored under the
396	/// provided key, then this will return None.
397	pub(crate) fn value(&self, key: &str) -> Option<&Value> {
398		match self.values.get(key) {
399			Some(v) => Some(v.as_ref()),
400			None if PROTECTED_PARAM_NAMES.contains(&key) || !self.isolated => match &self.parent {
401				Some(p) => p.value(key),
402				_ => None,
403			},
404			None => None,
405		}
406	}
407
408	/// Get a 'static view into the cancellation status.
409	#[cfg(feature = "scripting")]
410	pub(crate) fn cancellation(&self) -> crate::ctx::cancellation::Cancellation {
411		crate::ctx::cancellation::Cancellation::new(
412			self.deadline,
413			std::iter::successors(Some(self), |ctx| ctx.parent.as_ref().map(|c| c.as_ref()))
414				.map(|ctx| ctx.cancelled.clone())
415				.collect(),
416		)
417	}
418
419	//
420	// Capabilities
421	//
422
423	/// Set the capabilities for this context
424	pub(crate) fn add_capabilities(&mut self, caps: Arc<Capabilities>) {
425		self.capabilities = caps;
426	}
427
428	/// Get the capabilities for this context
429	#[allow(dead_code)]
430	pub(crate) fn get_capabilities(&self) -> Arc<Capabilities> {
431		self.capabilities.clone()
432	}
433
434	/// Check if scripting is allowed
435	#[allow(dead_code)]
436	pub(crate) fn check_allowed_scripting(&self) -> Result<(), Error> {
437		if !self.capabilities.allows_scripting() {
438			warn!("Capabilities denied scripting attempt");
439			return Err(Error::ScriptingNotAllowed);
440		}
441		trace!("Capabilities allowed scripting");
442		Ok(())
443	}
444
445	/// Check if a function is allowed
446	pub(crate) fn check_allowed_function(&self, target: &str) -> Result<(), Error> {
447		if !self.capabilities.allows_function_name(target) {
448			warn!("Capabilities denied function execution attempt, target: '{target}'");
449			return Err(Error::FunctionNotAllowed(target.to_string()));
450		}
451		trace!("Capabilities allowed function execution, target: '{target}'");
452		Ok(())
453	}
454
455	/// Check if a network target is allowed
456	#[cfg(feature = "http")]
457	pub(crate) fn check_allowed_net(&self, url: &Url) -> Result<(), Error> {
458		match url.host() {
459			Some(host) => {
460				let target = &NetTarget::Host(host.to_owned(), url.port_or_known_default());
461				if !self.capabilities.allows_network_target(target) {
462					warn!(
463						"Capabilities denied outgoing network connection attempt, target: '{target}'"
464					);
465					return Err(Error::NetTargetNotAllowed(target.to_string()));
466				}
467				trace!("Capabilities allowed outgoing network connection, target: '{target}'");
468				Ok(())
469			}
470			_ => Err(Error::InvalidUrl(url.to_string())),
471		}
472	}
473}