1use crate::cnf::PROTECTED_PARAM_NAMES;
2use crate::ctx::canceller::Canceller;
3use crate::ctx::reason::Reason;
4#[cfg(feature = "http")]
5use crate::dbs::capabilities::NetTarget;
6use crate::dbs::{Capabilities, Notification};
7use crate::err::Error;
8use crate::idx::planner::executor::QueryExecutor;
9use crate::idx::planner::{IterationStage, QueryPlanner};
10use crate::idx::trees::store::IndexStores;
11use crate::kvs::cache::ds::DatastoreCache;
12#[cfg(not(target_family = "wasm"))]
13use crate::kvs::IndexBuilder;
14use crate::kvs::Transaction;
15use crate::sql::value::Value;
16use async_channel::Sender;
17use std::borrow::Cow;
18use std::collections::HashMap;
19use std::fmt::{self, Debug};
20#[cfg(storage)]
21use std::path::PathBuf;
22use std::sync::atomic::{AtomicBool, Ordering};
23use std::sync::Arc;
24use std::time::Duration;
25use trice::Instant;
26#[cfg(feature = "http")]
27use url::Url;
28
29pub type Context = Arc<MutableContext>;
30
31#[non_exhaustive]
32pub struct MutableContext {
33 parent: Option<Context>,
35 deadline: Option<Instant>,
37 cancelled: Arc<AtomicBool>,
39 values: HashMap<Cow<'static, str>, Arc<Value>>,
41 notifications: Option<Sender<Notification>>,
43 query_planner: Option<Arc<QueryPlanner>>,
45 query_executor: Option<QueryExecutor>,
47 iteration_stage: Option<IterationStage>,
49 cache: Option<Arc<DatastoreCache>>,
51 index_stores: IndexStores,
53 #[cfg(not(target_family = "wasm"))]
55 index_builder: Option<IndexBuilder>,
56 capabilities: Arc<Capabilities>,
58 #[cfg(storage)]
59 temporary_directory: Option<Arc<PathBuf>>,
61 transaction: Option<Arc<Transaction>>,
63 isolated: bool,
65}
66
67impl Default for MutableContext {
68 fn default() -> Self {
69 MutableContext::background()
70 }
71}
72
73impl From<Transaction> for MutableContext {
74 fn from(txn: Transaction) -> Self {
75 let mut ctx = MutableContext::background();
76 ctx.set_transaction(Arc::new(txn));
77 ctx
78 }
79}
80
81impl Debug for MutableContext {
82 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83 f.debug_struct("Context")
84 .field("parent", &self.parent)
85 .field("deadline", &self.deadline)
86 .field("cancelled", &self.cancelled)
87 .field("values", &self.values)
88 .finish()
89 }
90}
91
92impl MutableContext {
93 pub(crate) fn background() -> Self {
95 Self {
96 values: HashMap::default(),
97 parent: None,
98 deadline: None,
99 cancelled: Arc::new(AtomicBool::new(false)),
100 notifications: None,
101 query_planner: None,
102 query_executor: None,
103 iteration_stage: None,
104 capabilities: Arc::new(Capabilities::default()),
105 index_stores: IndexStores::default(),
106 cache: None,
107 #[cfg(not(target_family = "wasm"))]
108 index_builder: None,
109 #[cfg(storage)]
110 temporary_directory: None,
111 transaction: None,
112 isolated: false,
113 }
114 }
115
116 pub(crate) fn new(parent: &Context) -> Self {
118 MutableContext {
119 values: HashMap::default(),
120 deadline: parent.deadline,
121 cancelled: Arc::new(AtomicBool::new(false)),
122 notifications: parent.notifications.clone(),
123 query_planner: parent.query_planner.clone(),
124 query_executor: parent.query_executor.clone(),
125 iteration_stage: parent.iteration_stage.clone(),
126 capabilities: parent.capabilities.clone(),
127 index_stores: parent.index_stores.clone(),
128 cache: parent.cache.clone(),
129 #[cfg(not(target_family = "wasm"))]
130 index_builder: parent.index_builder.clone(),
131 #[cfg(storage)]
132 temporary_directory: parent.temporary_directory.clone(),
133 transaction: parent.transaction.clone(),
134 isolated: false,
135 parent: Some(parent.clone()),
136 }
137 }
138
139 pub(crate) fn new_isolated(parent: &Context) -> Self {
143 Self {
144 values: HashMap::default(),
145 deadline: parent.deadline,
146 cancelled: Arc::new(AtomicBool::new(false)),
147 notifications: parent.notifications.clone(),
148 query_planner: parent.query_planner.clone(),
149 query_executor: parent.query_executor.clone(),
150 iteration_stage: parent.iteration_stage.clone(),
151 capabilities: parent.capabilities.clone(),
152 index_stores: parent.index_stores.clone(),
153 cache: parent.cache.clone(),
154 #[cfg(not(target_family = "wasm"))]
155 index_builder: parent.index_builder.clone(),
156 #[cfg(storage)]
157 temporary_directory: parent.temporary_directory.clone(),
158 transaction: parent.transaction.clone(),
159 isolated: true,
160 parent: Some(parent.clone()),
161 }
162 }
163
164 #[cfg(not(target_family = "wasm"))]
168 pub(crate) fn new_concurrent(from: &Context) -> Self {
169 Self {
170 values: HashMap::default(),
171 deadline: None,
172 cancelled: Arc::new(AtomicBool::new(false)),
173 notifications: from.notifications.clone(),
174 query_planner: from.query_planner.clone(),
175 query_executor: from.query_executor.clone(),
176 iteration_stage: from.iteration_stage.clone(),
177 capabilities: from.capabilities.clone(),
178 index_stores: from.index_stores.clone(),
179 cache: from.cache.clone(),
180 index_builder: from.index_builder.clone(),
181 #[cfg(storage)]
182 temporary_directory: from.temporary_directory.clone(),
183 transaction: None,
184 isolated: false,
185 parent: None,
186 }
187 }
188
189 pub(crate) fn from_ds(
191 time_out: Option<Duration>,
192 capabilities: Arc<Capabilities>,
193 index_stores: IndexStores,
194 cache: Arc<DatastoreCache>,
195 #[cfg(not(target_family = "wasm"))] index_builder: IndexBuilder,
196 #[cfg(storage)] temporary_directory: Option<Arc<PathBuf>>,
197 ) -> Result<MutableContext, Error> {
198 let mut ctx = Self {
199 values: HashMap::default(),
200 parent: None,
201 deadline: None,
202 cancelled: Arc::new(AtomicBool::new(false)),
203 notifications: None,
204 query_planner: None,
205 query_executor: None,
206 iteration_stage: None,
207 capabilities,
208 index_stores,
209 cache: Some(cache),
210 #[cfg(not(target_family = "wasm"))]
211 index_builder: Some(index_builder),
212 #[cfg(storage)]
213 temporary_directory,
214 transaction: None,
215 isolated: false,
216 };
217 if let Some(timeout) = time_out {
218 ctx.add_timeout(timeout)?;
219 }
220 Ok(ctx)
221 }
222
223 pub(crate) fn freeze(self) -> Context {
225 Arc::new(self)
226 }
227
228 pub(crate) fn unfreeze(ctx: Context) -> Result<MutableContext, Error> {
230 Arc::into_inner(ctx)
231 .ok_or_else(|| fail!("Tried to unfreeze a Context with multiple references"))
232 }
233
234 pub(crate) fn add_value<K>(&mut self, key: K, value: Arc<Value>)
237 where
238 K: Into<Cow<'static, str>>,
239 {
240 self.values.insert(key.into(), value);
241 }
242
243 pub(crate) fn add_values<T, K, V>(&mut self, iter: T)
246 where
247 T: IntoIterator<Item = (K, V)>,
248 K: Into<Cow<'static, str>>,
249 V: Into<Arc<Value>>,
250 {
251 self.values.extend(iter.into_iter().map(|(k, v)| (k.into(), v.into())))
252 }
253
254 pub(crate) fn add_cancel(&mut self) -> Canceller {
257 let cancelled = self.cancelled.clone();
258 Canceller::new(cancelled)
259 }
260
261 pub(crate) fn add_deadline(&mut self, deadline: Instant) {
264 match self.deadline {
265 Some(current) if current < deadline => (),
266 _ => self.deadline = Some(deadline),
267 }
268 }
269
270 pub(crate) fn add_timeout(&mut self, timeout: Duration) -> Result<(), Error> {
274 match Instant::now().checked_add(timeout) {
275 Some(deadline) => {
276 self.add_deadline(deadline);
277 Ok(())
278 }
279 None => Err(Error::InvalidTimeout(timeout.as_secs())),
280 }
281 }
282
283 pub(crate) fn add_notifications(&mut self, chn: Option<&Sender<Notification>>) {
286 self.notifications = chn.cloned()
287 }
288
289 pub(crate) fn set_query_planner(&mut self, qp: QueryPlanner) {
290 self.query_planner = Some(Arc::new(qp));
291 }
292
293 pub(crate) fn set_query_executor(&mut self, qe: QueryExecutor) {
294 self.query_executor = Some(qe);
295 }
296
297 pub(crate) fn set_iteration_stage(&mut self, is: IterationStage) {
298 self.iteration_stage = Some(is);
299 }
300
301 pub(crate) fn set_transaction(&mut self, txn: Arc<Transaction>) {
302 self.transaction = Some(txn);
303 }
304
305 pub(crate) fn tx(&self) -> Arc<Transaction> {
306 self.transaction
307 .clone()
308 .unwrap_or_else(|| unreachable!("The context was not associated with a transaction"))
309 }
310
311 pub(crate) fn timeout(&self) -> Option<Duration> {
314 self.deadline.map(|v| v.saturating_duration_since(Instant::now()))
315 }
316
317 pub(crate) fn notifications(&self) -> Option<Sender<Notification>> {
318 self.notifications.clone()
319 }
320
321 pub(crate) fn has_notifications(&self) -> bool {
322 self.notifications.is_some()
323 }
324
325 pub(crate) fn get_query_planner(&self) -> Option<&QueryPlanner> {
326 self.query_planner.as_ref().map(|qp| qp.as_ref())
327 }
328
329 pub(crate) fn get_query_executor(&self) -> Option<&QueryExecutor> {
330 self.query_executor.as_ref()
331 }
332
333 pub(crate) fn get_iteration_stage(&self) -> Option<&IterationStage> {
334 self.iteration_stage.as_ref()
335 }
336
337 pub(crate) fn get_index_stores(&self) -> &IndexStores {
339 &self.index_stores
340 }
341
342 #[cfg(not(target_family = "wasm"))]
344 pub(crate) fn get_index_builder(&self) -> Option<&IndexBuilder> {
345 self.index_builder.as_ref()
346 }
347
348 pub(crate) fn get_cache(&self) -> Option<Arc<DatastoreCache>> {
350 self.cache.clone()
351 }
352
353 pub(crate) fn done(&self, check_deadline: bool) -> Option<Reason> {
362 match self.deadline {
363 Some(deadline) if check_deadline && deadline <= Instant::now() => {
364 Some(Reason::Timedout)
365 }
366 _ if self.cancelled.load(Ordering::Relaxed) => Some(Reason::Canceled),
367 _ => match &self.parent {
368 Some(ctx) => ctx.done(check_deadline),
369 _ => None,
370 },
371 }
372 }
373
374 pub(crate) fn is_ok(&self, check_deadline: bool) -> bool {
376 self.done(check_deadline).is_none()
377 }
378
379 pub(crate) fn is_done(&self, check_deadline: bool) -> bool {
381 self.done(check_deadline).is_some()
382 }
383
384 pub(crate) fn is_timedout(&self) -> bool {
386 matches!(self.done(true), Some(Reason::Timedout))
387 }
388
389 #[cfg(storage)]
390 pub(crate) fn temporary_directory(&self) -> Option<&Arc<PathBuf>> {
392 self.temporary_directory.as_ref()
393 }
394
395 pub(crate) fn value(&self, key: &str) -> Option<&Value> {
398 match self.values.get(key) {
399 Some(v) => Some(v.as_ref()),
400 None if PROTECTED_PARAM_NAMES.contains(&key) || !self.isolated => match &self.parent {
401 Some(p) => p.value(key),
402 _ => None,
403 },
404 None => None,
405 }
406 }
407
408 #[cfg(feature = "scripting")]
410 pub(crate) fn cancellation(&self) -> crate::ctx::cancellation::Cancellation {
411 crate::ctx::cancellation::Cancellation::new(
412 self.deadline,
413 std::iter::successors(Some(self), |ctx| ctx.parent.as_ref().map(|c| c.as_ref()))
414 .map(|ctx| ctx.cancelled.clone())
415 .collect(),
416 )
417 }
418
419 pub(crate) fn add_capabilities(&mut self, caps: Arc<Capabilities>) {
425 self.capabilities = caps;
426 }
427
428 #[allow(dead_code)]
430 pub(crate) fn get_capabilities(&self) -> Arc<Capabilities> {
431 self.capabilities.clone()
432 }
433
434 #[allow(dead_code)]
436 pub(crate) fn check_allowed_scripting(&self) -> Result<(), Error> {
437 if !self.capabilities.allows_scripting() {
438 warn!("Capabilities denied scripting attempt");
439 return Err(Error::ScriptingNotAllowed);
440 }
441 trace!("Capabilities allowed scripting");
442 Ok(())
443 }
444
445 pub(crate) fn check_allowed_function(&self, target: &str) -> Result<(), Error> {
447 if !self.capabilities.allows_function_name(target) {
448 warn!("Capabilities denied function execution attempt, target: '{target}'");
449 return Err(Error::FunctionNotAllowed(target.to_string()));
450 }
451 trace!("Capabilities allowed function execution, target: '{target}'");
452 Ok(())
453 }
454
455 #[cfg(feature = "http")]
457 pub(crate) fn check_allowed_net(&self, url: &Url) -> Result<(), Error> {
458 match url.host() {
459 Some(host) => {
460 let target = &NetTarget::Host(host.to_owned(), url.port_or_known_default());
461 if !self.capabilities.allows_network_target(target) {
462 warn!(
463 "Capabilities denied outgoing network connection attempt, target: '{target}'"
464 );
465 return Err(Error::NetTargetNotAllowed(target.to_string()));
466 }
467 trace!("Capabilities allowed outgoing network connection, target: '{target}'");
468 Ok(())
469 }
470 _ => Err(Error::InvalidUrl(url.to_string())),
471 }
472 }
473}