use crate::ctx::canceller::Canceller;
use crate::ctx::reason::Reason;
#[cfg(feature = "http")]
use crate::dbs::capabilities::NetTarget;
use crate::dbs::{Capabilities, Notification};
use crate::err::Error;
use crate::idx::planner::executor::QueryExecutor;
use crate::idx::planner::{IterationStage, QueryPlanner};
use crate::idx::trees::store::IndexStores;
use crate::kvs::Transaction;
use crate::sql::value::Value;
use channel::Sender;
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::{self, Debug};
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
))]
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use trice::Instant;
#[cfg(feature = "http")]
use url::Url;
impl<'a> From<Value> for Cow<'a, Value> {
fn from(v: Value) -> Cow<'a, Value> {
Cow::Owned(v)
}
}
impl<'a> From<&'a Value> for Cow<'a, Value> {
fn from(v: &'a Value) -> Cow<'a, Value> {
Cow::Borrowed(v)
}
}
#[non_exhaustive]
pub struct Context<'a> {
parent: Option<&'a Context<'a>>,
deadline: Option<Instant>,
cancelled: Arc<AtomicBool>,
values: HashMap<Cow<'static, str>, Cow<'a, Value>>,
notifications: Option<Sender<Notification>>,
query_planner: Option<&'a QueryPlanner<'a>>,
query_executor: Option<QueryExecutor>,
iteration_stage: Option<IterationStage>,
index_stores: IndexStores,
capabilities: Arc<Capabilities>,
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
))]
temporary_directory: Option<Arc<PathBuf>>,
transaction: Option<Arc<Transaction>>,
isolated: bool,
}
impl<'a> Default for Context<'a> {
fn default() -> Self {
Context::background()
}
}
impl<'a> From<Transaction> for Context<'a> {
fn from(txn: Transaction) -> Self {
Context::background().with_transaction(Arc::new(txn))
}
}
impl<'a> Debug for Context<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Context")
.field("parent", &self.parent)
.field("deadline", &self.deadline)
.field("cancelled", &self.cancelled)
.field("values", &self.values)
.finish()
}
}
impl<'a> Context<'a> {
pub(crate) fn from_ds(
time_out: Option<Duration>,
capabilities: Capabilities,
index_stores: IndexStores,
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
))]
temporary_directory: Option<Arc<PathBuf>>,
) -> Result<Context<'a>, Error> {
let mut ctx = Self {
values: HashMap::default(),
parent: None,
deadline: None,
cancelled: Arc::new(AtomicBool::new(false)),
notifications: None,
query_planner: None,
query_executor: None,
iteration_stage: None,
capabilities: Arc::new(capabilities),
index_stores,
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
))]
temporary_directory,
transaction: None,
isolated: false,
};
if let Some(timeout) = time_out {
ctx.add_timeout(timeout)?;
}
Ok(ctx)
}
pub fn background() -> Self {
Self {
values: HashMap::default(),
parent: None,
deadline: None,
cancelled: Arc::new(AtomicBool::new(false)),
notifications: None,
query_planner: None,
query_executor: None,
iteration_stage: None,
capabilities: Arc::new(Capabilities::default()),
index_stores: IndexStores::default(),
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
))]
temporary_directory: None,
transaction: None,
isolated: false,
}
}
pub fn new(parent: &'a Context) -> Self {
Context {
values: HashMap::default(),
parent: Some(parent),
deadline: parent.deadline,
cancelled: Arc::new(AtomicBool::new(false)),
notifications: parent.notifications.clone(),
query_planner: parent.query_planner,
query_executor: parent.query_executor.clone(),
iteration_stage: parent.iteration_stage.clone(),
capabilities: parent.capabilities.clone(),
index_stores: parent.index_stores.clone(),
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
))]
temporary_directory: parent.temporary_directory.clone(),
transaction: parent.transaction.clone(),
isolated: false,
}
}
pub fn new_isolated(parent: &'a Context) -> Self {
Context {
values: HashMap::default(),
parent: Some(parent),
deadline: parent.deadline,
cancelled: Arc::new(AtomicBool::new(false)),
notifications: parent.notifications.clone(),
query_planner: parent.query_planner,
query_executor: parent.query_executor.clone(),
iteration_stage: parent.iteration_stage.clone(),
capabilities: parent.capabilities.clone(),
index_stores: parent.index_stores.clone(),
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
))]
temporary_directory: parent.temporary_directory.clone(),
transaction: parent.transaction.clone(),
isolated: true,
}
}
pub fn add_value<K, V>(&mut self, key: K, value: V)
where
K: Into<Cow<'static, str>>,
V: Into<Cow<'a, Value>>,
{
self.values.insert(key.into(), value.into());
}
pub fn add_cancel(&mut self) -> Canceller {
let cancelled = self.cancelled.clone();
Canceller::new(cancelled)
}
pub fn add_deadline(&mut self, deadline: Instant) {
match self.deadline {
Some(current) if current < deadline => (),
_ => self.deadline = Some(deadline),
}
}
pub fn add_timeout(&mut self, timeout: Duration) -> Result<(), Error> {
match Instant::now().checked_add(timeout) {
Some(deadline) => {
self.add_deadline(deadline);
Ok(())
}
None => Err(Error::InvalidTimeout(timeout.as_secs())),
}
}
pub fn add_notifications(&mut self, chn: Option<&Sender<Notification>>) {
self.notifications = chn.cloned()
}
pub(crate) fn set_query_planner(&mut self, qp: &'a QueryPlanner) {
self.query_planner = Some(qp);
}
pub(crate) fn set_query_executor(&mut self, qe: QueryExecutor) {
self.query_executor = Some(qe);
}
pub(crate) fn set_iteration_stage(&mut self, is: IterationStage) {
self.iteration_stage = Some(is);
}
pub(crate) fn set_transaction(&mut self, txn: Arc<Transaction>) {
self.transaction = Some(txn);
}
pub(crate) fn with_transaction(mut self, txn: Arc<Transaction>) -> Self {
self.transaction = Some(txn);
self
}
pub(crate) fn tx(&self) -> Arc<Transaction> {
self.transaction
.clone()
.unwrap_or_else(|| unreachable!("The context was not associated with a transaction"))
}
pub fn timeout(&self) -> Option<Duration> {
self.deadline.map(|v| v.saturating_duration_since(Instant::now()))
}
pub fn notifications(&self) -> Option<Sender<Notification>> {
self.notifications.clone()
}
pub(crate) fn get_query_planner(&self) -> Option<&QueryPlanner> {
self.query_planner
}
pub(crate) fn get_query_executor(&self) -> Option<&QueryExecutor> {
self.query_executor.as_ref()
}
pub(crate) fn get_iteration_stage(&self) -> Option<&IterationStage> {
self.iteration_stage.as_ref()
}
pub(crate) fn get_index_stores(&self) -> &IndexStores {
&self.index_stores
}
pub fn done(&self) -> Option<Reason> {
match self.deadline {
Some(deadline) if deadline <= Instant::now() => Some(Reason::Timedout),
_ if self.cancelled.load(Ordering::Relaxed) => Some(Reason::Canceled),
_ => match self.parent {
Some(ctx) => ctx.done(),
_ => None,
},
}
}
pub fn is_ok(&self) -> bool {
self.done().is_none()
}
pub fn is_done(&self) -> bool {
self.done().is_some()
}
pub fn is_timedout(&self) -> bool {
matches!(self.done(), Some(Reason::Timedout))
}
#[cfg(any(
feature = "kv-mem",
feature = "kv-surrealkv",
feature = "kv-rocksdb",
feature = "kv-fdb",
feature = "kv-tikv",
))]
pub fn temporary_directory(&self) -> Option<&Arc<PathBuf>> {
self.temporary_directory.as_ref()
}
pub fn value(&self, key: &str) -> Option<&Value> {
match self.values.get(key) {
Some(v) => match v {
Cow::Borrowed(v) => Some(*v),
Cow::Owned(v) => Some(v),
},
None if !self.isolated => match self.parent {
Some(p) => p.value(key),
_ => None,
},
None => None,
}
}
#[cfg(feature = "scripting")]
pub fn cancellation(&self) -> crate::ctx::cancellation::Cancellation {
crate::ctx::cancellation::Cancellation::new(
self.deadline,
std::iter::successors(Some(self), |ctx| ctx.parent)
.map(|ctx| ctx.cancelled.clone())
.collect(),
)
}
pub fn add_capabilities(&mut self, caps: Capabilities) {
self.capabilities = Arc::new(caps);
}
#[allow(dead_code)]
pub fn get_capabilities(&self) -> Arc<Capabilities> {
self.capabilities.clone()
}
#[allow(dead_code)]
pub fn check_allowed_scripting(&self) -> Result<(), Error> {
if !self.capabilities.allows_scripting() {
return Err(Error::ScriptingNotAllowed);
}
Ok(())
}
pub fn check_allowed_function(&self, target: &str) -> Result<(), Error> {
if !self.capabilities.allows_function_name(target) {
return Err(Error::FunctionNotAllowed(target.to_string()));
}
Ok(())
}
#[cfg(feature = "http")]
pub fn check_allowed_net(&self, target: &Url) -> Result<(), Error> {
match target.host() {
Some(host)
if self.capabilities.allows_network_target(&NetTarget::Host(
host.to_owned(),
target.port_or_known_default(),
)) =>
{
Ok(())
}
_ => Err(Error::NetTargetNotAllowed(target.to_string())),
}
}
}