1use crate::cnf::PROTECTED_PARAM_NAMES;
2use crate::ctx::canceller::Canceller;
3use crate::ctx::reason::Reason;
4#[cfg(feature = "http")]
5use crate::dbs::capabilities::NetTarget;
6use crate::dbs::{Capabilities, Notification};
7use crate::err::Error;
8use crate::idx::planner::executor::QueryExecutor;
9use crate::idx::planner::{IterationStage, QueryPlanner};
10use crate::idx::trees::store::IndexStores;
11use crate::kvs::cache::ds::DatastoreCache;
12#[cfg(not(target_family = "wasm"))]
13use crate::kvs::IndexBuilder;
14use crate::kvs::Transaction;
15use crate::mem::ALLOC;
16use crate::sql::value::Value;
17use async_channel::Sender;
18use std::borrow::Cow;
19use std::collections::HashMap;
20use std::fmt::{self, Debug};
21#[cfg(storage)]
22use std::path::PathBuf;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::Arc;
25use std::time::Duration;
26use trice::Instant;
27#[cfg(feature = "http")]
28use url::Url;
29
30pub type Context = Arc<MutableContext>;
31
32#[non_exhaustive]
33pub struct MutableContext {
34 parent: Option<Context>,
36 deadline: Option<Instant>,
38 cancelled: Arc<AtomicBool>,
40 values: HashMap<Cow<'static, str>, Arc<Value>>,
42 notifications: Option<Sender<Notification>>,
44 query_planner: Option<Arc<QueryPlanner>>,
46 query_executor: Option<QueryExecutor>,
48 iteration_stage: Option<IterationStage>,
50 cache: Option<Arc<DatastoreCache>>,
52 index_stores: IndexStores,
54 #[cfg(not(target_family = "wasm"))]
56 index_builder: Option<IndexBuilder>,
57 capabilities: Arc<Capabilities>,
59 #[cfg(storage)]
60 temporary_directory: Option<Arc<PathBuf>>,
62 transaction: Option<Arc<Transaction>>,
64 isolated: bool,
66}
67
68impl Default for MutableContext {
69 fn default() -> Self {
70 MutableContext::background()
71 }
72}
73
74impl From<Transaction> for MutableContext {
75 fn from(txn: Transaction) -> Self {
76 let mut ctx = MutableContext::background();
77 ctx.set_transaction(Arc::new(txn));
78 ctx
79 }
80}
81
82impl Debug for MutableContext {
83 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
84 f.debug_struct("Context")
85 .field("parent", &self.parent)
86 .field("deadline", &self.deadline)
87 .field("cancelled", &self.cancelled)
88 .field("values", &self.values)
89 .finish()
90 }
91}
92
93impl MutableContext {
94 pub(crate) fn background() -> Self {
96 Self {
97 values: HashMap::default(),
98 parent: None,
99 deadline: None,
100 cancelled: Arc::new(AtomicBool::new(false)),
101 notifications: None,
102 query_planner: None,
103 query_executor: None,
104 iteration_stage: None,
105 capabilities: Arc::new(Capabilities::default()),
106 index_stores: IndexStores::default(),
107 cache: None,
108 #[cfg(not(target_family = "wasm"))]
109 index_builder: None,
110 #[cfg(storage)]
111 temporary_directory: None,
112 transaction: None,
113 isolated: false,
114 }
115 }
116
117 pub(crate) fn new(parent: &Context) -> Self {
119 MutableContext {
120 values: HashMap::default(),
121 deadline: parent.deadline,
122 cancelled: Arc::new(AtomicBool::new(false)),
123 notifications: parent.notifications.clone(),
124 query_planner: parent.query_planner.clone(),
125 query_executor: parent.query_executor.clone(),
126 iteration_stage: parent.iteration_stage.clone(),
127 capabilities: parent.capabilities.clone(),
128 index_stores: parent.index_stores.clone(),
129 cache: parent.cache.clone(),
130 #[cfg(not(target_family = "wasm"))]
131 index_builder: parent.index_builder.clone(),
132 #[cfg(storage)]
133 temporary_directory: parent.temporary_directory.clone(),
134 transaction: parent.transaction.clone(),
135 isolated: false,
136 parent: Some(parent.clone()),
137 }
138 }
139
140 pub(crate) fn new_isolated(parent: &Context) -> Self {
144 Self {
145 values: HashMap::default(),
146 deadline: parent.deadline,
147 cancelled: Arc::new(AtomicBool::new(false)),
148 notifications: parent.notifications.clone(),
149 query_planner: parent.query_planner.clone(),
150 query_executor: parent.query_executor.clone(),
151 iteration_stage: parent.iteration_stage.clone(),
152 capabilities: parent.capabilities.clone(),
153 index_stores: parent.index_stores.clone(),
154 cache: parent.cache.clone(),
155 #[cfg(not(target_family = "wasm"))]
156 index_builder: parent.index_builder.clone(),
157 #[cfg(storage)]
158 temporary_directory: parent.temporary_directory.clone(),
159 transaction: parent.transaction.clone(),
160 isolated: true,
161 parent: Some(parent.clone()),
162 }
163 }
164
165 #[cfg(not(target_family = "wasm"))]
169 pub(crate) fn new_concurrent(from: &Context) -> Self {
170 Self {
171 values: HashMap::default(),
172 deadline: None,
173 cancelled: Arc::new(AtomicBool::new(false)),
174 notifications: from.notifications.clone(),
175 query_planner: from.query_planner.clone(),
176 query_executor: from.query_executor.clone(),
177 iteration_stage: from.iteration_stage.clone(),
178 capabilities: from.capabilities.clone(),
179 index_stores: from.index_stores.clone(),
180 cache: from.cache.clone(),
181 index_builder: from.index_builder.clone(),
182 #[cfg(storage)]
183 temporary_directory: from.temporary_directory.clone(),
184 transaction: None,
185 isolated: false,
186 parent: None,
187 }
188 }
189
190 pub(crate) fn from_ds(
192 time_out: Option<Duration>,
193 capabilities: Arc<Capabilities>,
194 index_stores: IndexStores,
195 cache: Arc<DatastoreCache>,
196 #[cfg(not(target_family = "wasm"))] index_builder: IndexBuilder,
197 #[cfg(storage)] temporary_directory: Option<Arc<PathBuf>>,
198 ) -> Result<MutableContext, Error> {
199 let mut ctx = Self {
200 values: HashMap::default(),
201 parent: None,
202 deadline: None,
203 cancelled: Arc::new(AtomicBool::new(false)),
204 notifications: None,
205 query_planner: None,
206 query_executor: None,
207 iteration_stage: None,
208 capabilities,
209 index_stores,
210 cache: Some(cache),
211 #[cfg(not(target_family = "wasm"))]
212 index_builder: Some(index_builder),
213 #[cfg(storage)]
214 temporary_directory,
215 transaction: None,
216 isolated: false,
217 };
218 if let Some(timeout) = time_out {
219 ctx.add_timeout(timeout)?;
220 }
221 Ok(ctx)
222 }
223
224 pub(crate) fn freeze(self) -> Context {
226 Arc::new(self)
227 }
228
229 pub(crate) fn unfreeze(ctx: Context) -> Result<MutableContext, Error> {
231 Arc::into_inner(ctx)
232 .ok_or_else(|| fail!("Tried to unfreeze a Context with multiple references"))
233 }
234
235 pub(crate) fn add_value<K>(&mut self, key: K, value: Arc<Value>)
238 where
239 K: Into<Cow<'static, str>>,
240 {
241 self.values.insert(key.into(), value);
242 }
243
244 pub(crate) fn add_values<T, K, V>(&mut self, iter: T)
247 where
248 T: IntoIterator<Item = (K, V)>,
249 K: Into<Cow<'static, str>>,
250 V: Into<Arc<Value>>,
251 {
252 self.values.extend(iter.into_iter().map(|(k, v)| (k.into(), v.into())))
253 }
254
255 pub(crate) fn add_cancel(&mut self) -> Canceller {
258 let cancelled = self.cancelled.clone();
259 Canceller::new(cancelled)
260 }
261
262 pub(crate) fn add_deadline(&mut self, deadline: Instant) {
265 match self.deadline {
266 Some(current) if current < deadline => (),
267 _ => self.deadline = Some(deadline),
268 }
269 }
270
271 pub(crate) fn add_timeout(&mut self, timeout: Duration) -> Result<(), Error> {
275 match Instant::now().checked_add(timeout) {
276 Some(deadline) => {
277 self.add_deadline(deadline);
278 Ok(())
279 }
280 None => Err(Error::InvalidTimeout(timeout.as_secs())),
281 }
282 }
283
284 pub(crate) fn add_notifications(&mut self, chn: Option<&Sender<Notification>>) {
287 self.notifications = chn.cloned()
288 }
289
290 pub(crate) fn set_query_planner(&mut self, qp: QueryPlanner) {
291 self.query_planner = Some(Arc::new(qp));
292 }
293
294 pub(crate) fn set_query_executor(&mut self, qe: QueryExecutor) {
295 self.query_executor = Some(qe);
296 }
297
298 pub(crate) fn set_iteration_stage(&mut self, is: IterationStage) {
299 self.iteration_stage = Some(is);
300 }
301
302 pub(crate) fn set_transaction(&mut self, txn: Arc<Transaction>) {
303 self.transaction = Some(txn);
304 }
305
306 pub(crate) fn tx(&self) -> Arc<Transaction> {
307 self.transaction
308 .clone()
309 .unwrap_or_else(|| unreachable!("The context was not associated with a transaction"))
310 }
311
312 pub(crate) fn timeout(&self) -> Option<Duration> {
315 self.deadline.map(|v| v.saturating_duration_since(Instant::now()))
316 }
317
318 pub(crate) fn notifications(&self) -> Option<Sender<Notification>> {
319 self.notifications.clone()
320 }
321
322 pub(crate) fn has_notifications(&self) -> bool {
323 self.notifications.is_some()
324 }
325
326 pub(crate) fn get_query_planner(&self) -> Option<&QueryPlanner> {
327 self.query_planner.as_ref().map(|qp| qp.as_ref())
328 }
329
330 pub(crate) fn get_query_executor(&self) -> Option<&QueryExecutor> {
331 self.query_executor.as_ref()
332 }
333
334 pub(crate) fn get_iteration_stage(&self) -> Option<&IterationStage> {
335 self.iteration_stage.as_ref()
336 }
337
338 pub(crate) fn get_index_stores(&self) -> &IndexStores {
340 &self.index_stores
341 }
342
343 #[cfg(not(target_family = "wasm"))]
345 pub(crate) fn get_index_builder(&self) -> Option<&IndexBuilder> {
346 self.index_builder.as_ref()
347 }
348
349 pub(crate) fn get_cache(&self) -> Option<Arc<DatastoreCache>> {
351 self.cache.clone()
352 }
353
354 pub(crate) fn done(&self, deep_check: bool) -> Result<Option<Reason>, Error> {
363 match self.deadline {
364 Some(deadline) if deep_check && deadline <= Instant::now() => {
365 Ok(Some(Reason::Timedout))
366 }
367 _ if self.cancelled.load(Ordering::Relaxed) => Ok(Some(Reason::Canceled)),
368 _ => {
369 if deep_check && ALLOC.is_beyond_threshold() {
370 return Err(Error::QueryBeyondMemoryThreshold);
371 }
372 match &self.parent {
373 Some(ctx) => ctx.done(deep_check),
374 _ => Ok(None),
375 }
376 }
377 }
378 }
379
380 pub(crate) fn is_ok(&self, deep_check: bool) -> Result<bool, Error> {
382 Ok(self.done(deep_check)?.is_none())
383 }
384
385 pub(crate) fn is_done(&self, deep_check: bool) -> Result<bool, Error> {
387 Ok(self.done(deep_check)?.is_some())
388 }
389
390 pub(crate) fn is_timedout(&self) -> Result<bool, Error> {
392 Ok(matches!(self.done(true)?, Some(Reason::Timedout)))
393 }
394
395 #[cfg(storage)]
396 pub(crate) fn temporary_directory(&self) -> Option<&Arc<PathBuf>> {
398 self.temporary_directory.as_ref()
399 }
400
401 pub(crate) fn value(&self, key: &str) -> Option<&Value> {
404 match self.values.get(key) {
405 Some(v) => Some(v.as_ref()),
406 None if PROTECTED_PARAM_NAMES.contains(&key) || !self.isolated => match &self.parent {
407 Some(p) => p.value(key),
408 _ => None,
409 },
410 None => None,
411 }
412 }
413
414 #[cfg(feature = "scripting")]
416 pub(crate) fn cancellation(&self) -> crate::ctx::cancellation::Cancellation {
417 crate::ctx::cancellation::Cancellation::new(
418 self.deadline,
419 std::iter::successors(Some(self), |ctx| ctx.parent.as_ref().map(|c| c.as_ref()))
420 .map(|ctx| ctx.cancelled.clone())
421 .collect(),
422 )
423 }
424
425 pub(crate) fn add_capabilities(&mut self, caps: Arc<Capabilities>) {
431 self.capabilities = caps;
432 }
433
434 #[allow(dead_code)]
436 pub(crate) fn get_capabilities(&self) -> Arc<Capabilities> {
437 self.capabilities.clone()
438 }
439
440 #[allow(dead_code)]
442 pub(crate) fn check_allowed_scripting(&self) -> Result<(), Error> {
443 if !self.capabilities.allows_scripting() {
444 warn!("Capabilities denied scripting attempt");
445 return Err(Error::ScriptingNotAllowed);
446 }
447 trace!("Capabilities allowed scripting");
448 Ok(())
449 }
450
451 pub(crate) fn check_allowed_function(&self, target: &str) -> Result<(), Error> {
453 if !self.capabilities.allows_function_name(target) {
454 warn!("Capabilities denied function execution attempt, target: '{target}'");
455 return Err(Error::FunctionNotAllowed(target.to_string()));
456 }
457 trace!("Capabilities allowed function execution, target: '{target}'");
458 Ok(())
459 }
460
461 #[cfg(feature = "http")]
463 pub(crate) fn check_allowed_net(&self, url: &Url) -> Result<(), Error> {
464 match url.host() {
465 Some(host) => {
466 let target = &NetTarget::Host(host.to_owned(), url.port_or_known_default());
467 if !self.capabilities.allows_network_target(target) {
468 warn!(
469 "Capabilities denied outgoing network connection attempt, target: '{target}'"
470 );
471 return Err(Error::NetTargetNotAllowed(target.to_string()));
472 }
473 trace!("Capabilities allowed outgoing network connection, target: '{target}'");
474 Ok(())
475 }
476 _ => Err(Error::InvalidUrl(url.to_string())),
477 }
478 }
479}