use crate::cnf::PROTECTED_PARAM_NAMES;
use crate::ctx::canceller::Canceller;
use crate::ctx::reason::Reason;
#[cfg(feature = "http")]
use crate::dbs::capabilities::NetTarget;
use crate::dbs::{Capabilities, Notification};
use crate::err::Error;
use crate::idx::planner::executor::QueryExecutor;
use crate::idx::planner::{IterationStage, QueryPlanner};
use crate::idx::trees::store::IndexStores;
#[cfg(not(target_arch = "wasm32"))]
use crate::kvs::IndexBuilder;
use crate::kvs::Transaction;
use crate::sql::value::Value;
use channel::Sender;
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::{self, Debug};
#[cfg(storage)]
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use trice::Instant;
#[cfg(feature = "http")]
use url::Url;
impl<'a> From<Value> for Cow<'a, Value> {
fn from(v: Value) -> Cow<'a, Value> {
Cow::Owned(v)
}
}
impl<'a> From<&'a Value> for Cow<'a, Value> {
fn from(v: &'a Value) -> Cow<'a, Value> {
Cow::Borrowed(v)
}
}
pub type Context = Arc<MutableContext>;
#[non_exhaustive]
pub struct MutableContext {
parent: Option<Context>,
deadline: Option<Instant>,
cancelled: Arc<AtomicBool>,
values: HashMap<Cow<'static, str>, Arc<Value>>,
notifications: Option<Sender<Notification>>,
query_planner: Option<Arc<QueryPlanner>>,
query_executor: Option<QueryExecutor>,
iteration_stage: Option<IterationStage>,
index_stores: IndexStores,
#[cfg(not(target_arch = "wasm32"))]
index_builder: Option<IndexBuilder>,
capabilities: Arc<Capabilities>,
#[cfg(storage)]
temporary_directory: Option<Arc<PathBuf>>,
transaction: Option<Arc<Transaction>>,
isolated: bool,
}
impl Default for MutableContext {
fn default() -> Self {
MutableContext::background()
}
}
impl From<Transaction> for MutableContext {
fn from(txn: Transaction) -> Self {
let mut ctx = MutableContext::background();
ctx.set_transaction(Arc::new(txn));
ctx
}
}
impl Debug for MutableContext {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Context")
.field("parent", &self.parent)
.field("deadline", &self.deadline)
.field("cancelled", &self.cancelled)
.field("values", &self.values)
.finish()
}
}
impl MutableContext {
pub(crate) fn from_ds(
time_out: Option<Duration>,
capabilities: Capabilities,
index_stores: IndexStores,
#[cfg(not(target_arch = "wasm32"))] index_builder: IndexBuilder,
#[cfg(storage)] temporary_directory: Option<Arc<PathBuf>>,
) -> Result<MutableContext, Error> {
let mut ctx = Self {
values: HashMap::default(),
parent: None,
deadline: None,
cancelled: Arc::new(AtomicBool::new(false)),
notifications: None,
query_planner: None,
query_executor: None,
iteration_stage: None,
capabilities: Arc::new(capabilities),
index_stores,
#[cfg(not(target_arch = "wasm32"))]
index_builder: Some(index_builder),
#[cfg(storage)]
temporary_directory,
transaction: None,
isolated: false,
};
if let Some(timeout) = time_out {
ctx.add_timeout(timeout)?;
}
Ok(ctx)
}
pub fn background() -> Self {
Self {
values: HashMap::default(),
parent: None,
deadline: None,
cancelled: Arc::new(AtomicBool::new(false)),
notifications: None,
query_planner: None,
query_executor: None,
iteration_stage: None,
capabilities: Arc::new(Capabilities::default()),
index_stores: IndexStores::default(),
#[cfg(not(target_arch = "wasm32"))]
index_builder: None,
#[cfg(storage)]
temporary_directory: None,
transaction: None,
isolated: false,
}
}
pub fn new(parent: &Context) -> Self {
MutableContext {
values: HashMap::default(),
deadline: parent.deadline,
cancelled: Arc::new(AtomicBool::new(false)),
notifications: parent.notifications.clone(),
query_planner: parent.query_planner.clone(),
query_executor: parent.query_executor.clone(),
iteration_stage: parent.iteration_stage.clone(),
capabilities: parent.capabilities.clone(),
index_stores: parent.index_stores.clone(),
#[cfg(not(target_arch = "wasm32"))]
index_builder: parent.index_builder.clone(),
#[cfg(storage)]
temporary_directory: parent.temporary_directory.clone(),
transaction: parent.transaction.clone(),
isolated: false,
parent: Some(parent.clone()),
}
}
pub(crate) fn freeze(self) -> Context {
Arc::new(self)
}
pub(crate) fn unfreeze(ctx: Context) -> Result<MutableContext, Error> {
match Arc::try_unwrap(ctx) {
Ok(inner) => Ok(inner),
Err(_) => Err(fail!("Tried to unfreeze a non-existent Context")),
}
}
pub fn new_isolated(parent: &Context) -> Self {
Self {
values: HashMap::default(),
deadline: parent.deadline,
cancelled: Arc::new(AtomicBool::new(false)),
notifications: parent.notifications.clone(),
query_planner: parent.query_planner.clone(),
query_executor: parent.query_executor.clone(),
iteration_stage: parent.iteration_stage.clone(),
capabilities: parent.capabilities.clone(),
index_stores: parent.index_stores.clone(),
#[cfg(not(target_arch = "wasm32"))]
index_builder: parent.index_builder.clone(),
#[cfg(storage)]
temporary_directory: parent.temporary_directory.clone(),
transaction: parent.transaction.clone(),
isolated: true,
parent: Some(parent.clone()),
}
}
pub fn new_concurrent(from: &Context) -> Self {
Self {
values: HashMap::default(),
deadline: None,
cancelled: Arc::new(AtomicBool::new(false)),
notifications: from.notifications.clone(),
query_planner: from.query_planner.clone(),
query_executor: from.query_executor.clone(),
iteration_stage: from.iteration_stage.clone(),
capabilities: from.capabilities.clone(),
index_stores: from.index_stores.clone(),
#[cfg(not(target_arch = "wasm32"))]
index_builder: from.index_builder.clone(),
#[cfg(storage)]
temporary_directory: from.temporary_directory.clone(),
transaction: None,
isolated: false,
parent: None,
}
}
pub fn add_value<K>(&mut self, key: K, value: Arc<Value>)
where
K: Into<Cow<'static, str>>,
{
self.values.insert(key.into(), value);
}
pub fn add_cancel(&mut self) -> Canceller {
let cancelled = self.cancelled.clone();
Canceller::new(cancelled)
}
pub fn add_deadline(&mut self, deadline: Instant) {
match self.deadline {
Some(current) if current < deadline => (),
_ => self.deadline = Some(deadline),
}
}
pub fn add_timeout(&mut self, timeout: Duration) -> Result<(), Error> {
match Instant::now().checked_add(timeout) {
Some(deadline) => {
self.add_deadline(deadline);
Ok(())
}
None => Err(Error::InvalidTimeout(timeout.as_secs())),
}
}
pub fn add_notifications(&mut self, chn: Option<&Sender<Notification>>) {
self.notifications = chn.cloned()
}
pub(crate) fn set_query_planner(&mut self, qp: QueryPlanner) {
self.query_planner = Some(Arc::new(qp));
}
pub(crate) fn set_query_executor(&mut self, qe: QueryExecutor) {
self.query_executor = Some(qe);
}
pub(crate) fn set_iteration_stage(&mut self, is: IterationStage) {
self.iteration_stage = Some(is);
}
pub(crate) fn set_transaction(&mut self, txn: Arc<Transaction>) {
self.transaction = Some(txn);
}
pub(crate) fn tx(&self) -> Arc<Transaction> {
self.transaction
.clone()
.unwrap_or_else(|| unreachable!("The context was not associated with a transaction"))
}
pub fn timeout(&self) -> Option<Duration> {
self.deadline.map(|v| v.saturating_duration_since(Instant::now()))
}
pub fn notifications(&self) -> Option<Sender<Notification>> {
self.notifications.clone()
}
pub(crate) fn get_query_planner(&self) -> Option<&QueryPlanner> {
self.query_planner.as_ref().map(|qp| qp.as_ref())
}
pub(crate) fn get_query_executor(&self) -> Option<&QueryExecutor> {
self.query_executor.as_ref()
}
pub(crate) fn get_iteration_stage(&self) -> Option<&IterationStage> {
self.iteration_stage.as_ref()
}
pub(crate) fn get_index_stores(&self) -> &IndexStores {
&self.index_stores
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn get_index_builder(&self) -> Option<&IndexBuilder> {
self.index_builder.as_ref()
}
pub fn done(&self) -> Option<Reason> {
match self.deadline {
Some(deadline) if deadline <= Instant::now() => Some(Reason::Timedout),
_ if self.cancelled.load(Ordering::Relaxed) => Some(Reason::Canceled),
_ => match &self.parent {
Some(ctx) => ctx.done(),
_ => None,
},
}
}
pub fn is_ok(&self) -> bool {
self.done().is_none()
}
pub fn is_done(&self) -> bool {
self.done().is_some()
}
pub fn is_timedout(&self) -> bool {
matches!(self.done(), Some(Reason::Timedout))
}
#[cfg(storage)]
pub fn temporary_directory(&self) -> Option<&Arc<PathBuf>> {
self.temporary_directory.as_ref()
}
pub fn value(&self, key: &str) -> Option<&Value> {
match self.values.get(key) {
Some(v) => Some(v.as_ref()),
None if PROTECTED_PARAM_NAMES.contains(&key) || !self.isolated => match &self.parent {
Some(p) => p.value(key),
_ => None,
},
None => None,
}
}
#[cfg(feature = "scripting")]
pub fn cancellation(&self) -> crate::ctx::cancellation::Cancellation {
crate::ctx::cancellation::Cancellation::new(
self.deadline,
std::iter::successors(Some(self), |ctx| ctx.parent.as_ref().map(|c| c.as_ref()))
.map(|ctx| ctx.cancelled.clone())
.collect(),
)
}
pub fn add_capabilities(&mut self, caps: Capabilities) {
self.capabilities = Arc::new(caps);
}
#[allow(dead_code)]
pub fn get_capabilities(&self) -> Arc<Capabilities> {
self.capabilities.clone()
}
#[allow(dead_code)]
pub fn check_allowed_scripting(&self) -> Result<(), Error> {
if !self.capabilities.allows_scripting() {
return Err(Error::ScriptingNotAllowed);
}
Ok(())
}
pub fn check_allowed_function(&self, target: &str) -> Result<(), Error> {
if !self.capabilities.allows_function_name(target) {
return Err(Error::FunctionNotAllowed(target.to_string()));
}
Ok(())
}
#[cfg(feature = "http")]
pub fn check_allowed_net(&self, target: &Url) -> Result<(), Error> {
match target.host() {
Some(host)
if self.capabilities.allows_network_target(&NetTarget::Host(
host.to_owned(),
target.port_or_known_default(),
)) =>
{
Ok(())
}
_ => Err(Error::NetTargetNotAllowed(target.to_string())),
}
}
}