surrealdb_core/sql/statements/
live.rsuse crate::ctx::Context;
use crate::dbs::Options;
use crate::doc::CursorDoc;
use crate::err::{Error, LiveQueryCause};
use crate::fflags::FFLAGS;
use crate::iam::Auth;
use crate::kvs::lq_structs::{LqEntry, TrackedResult};
use crate::sql::statements::info::InfoStructure;
use crate::sql::{Cond, Fetchs, Fields, Object, Table, Uuid, Value};
use derive::Store;
use futures::lock::MutexGuard;
use reblessive::tree::Stk;
use revision::revisioned;
use serde::{Deserialize, Serialize};
use std::fmt;
#[revisioned(revision = 2)]
#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
#[non_exhaustive]
pub struct LiveStatement {
pub id: Uuid,
pub node: Uuid,
pub expr: Fields,
pub what: Value,
pub cond: Option<Cond>,
pub fetch: Option<Fetchs>,
pub(crate) archived: Option<Uuid>,
#[revision(start = 2)]
pub(crate) session: Option<Value>,
pub(crate) auth: Option<Auth>,
}
impl LiveStatement {
#[doc(hidden)]
pub fn new(expr: Fields) -> Self {
LiveStatement {
id: Uuid::new_v4(),
node: Uuid::new_v4(),
expr,
..Default::default()
}
}
pub(crate) fn from_source_parts(
expr: Fields,
what: Value,
cond: Option<Cond>,
fetch: Option<Fetchs>,
) -> Self {
LiveStatement {
id: Uuid::new_v4(),
node: Uuid::new_v4(),
expr,
what,
cond,
fetch,
..Default::default()
}
}
pub(crate) async fn compute(
&self,
stk: &mut Stk,
ctx: &Context<'_>,
opt: &Options,
doc: Option<&CursorDoc<'_>>,
) -> Result<Value, Error> {
opt.realtime()?;
opt.valid_for_db()?;
let nid = opt.id()?;
let mut stm = LiveStatement {
session: ctx.value("session").cloned(),
auth: Some(opt.auth.as_ref().clone()),
..self.clone()
};
let id = stm.id.0;
match FFLAGS.change_feed_live_queries.enabled() {
true => {
let mut run = ctx.tx_lock().await;
match stm.what.compute(stk, ctx, opt, doc).await? {
Value::Table(tb) => {
let mut stm = stm;
stm.what = Value::Table(tb.clone());
let ns = opt.ns()?.to_string();
let db = opt.db()?.to_string();
self.validate_change_feed_valid(&mut run, &ns, &db, &tb).await?;
run.pre_commit_register_async_event(TrackedResult::LiveQuery(LqEntry {
live_id: stm.id,
ns,
db,
stm,
}))?;
}
v => {
return Err(Error::LiveStatement {
value: v.to_string(),
});
}
}
Ok(id.into())
}
false => {
let mut run = ctx.tx_lock().await;
match stm.what.compute(stk, ctx, opt, doc).await? {
Value::Table(tb) => {
stm.node = nid.into();
run.putc_ndlq(nid, id, opt.ns()?, opt.db()?, tb.as_str(), None).await?;
run.putc_tblq(opt.ns()?, opt.db()?, &tb, stm, None).await?;
}
v => {
return Err(Error::LiveStatement {
value: v.to_string(),
});
}
};
Ok(id.into())
}
}
}
async fn validate_change_feed_valid(
&self,
tx: &mut MutexGuard<'_, crate::kvs::Transaction>,
ns: &str,
db: &str,
tb: &Table,
) -> Result<(), Error> {
let tb_definition = tx.get_and_cache_tb(ns, db, tb).await.map_err(|e| match e {
Error::TbNotFound {
value: _tb,
} => Error::LiveQueryError(LiveQueryCause::MissingChangeFeed),
_ => e,
})?;
let cf = tb_definition
.changefeed
.ok_or(Error::LiveQueryError(LiveQueryCause::MissingChangeFeed))?;
if !cf.store_diff {
return Err(Error::LiveQueryError(LiveQueryCause::ChangeFeedNoOriginal));
}
Ok(())
}
pub(crate) fn archive(mut self, node_id: Uuid) -> LiveStatement {
self.archived = Some(node_id);
self
}
}
impl fmt::Display for LiveStatement {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "LIVE SELECT {} FROM {}", self.expr, self.what)?;
if let Some(ref v) = self.cond {
write!(f, " {v}")?
}
if let Some(ref v) = self.fetch {
write!(f, " {v}")?
}
Ok(())
}
}
impl InfoStructure for LiveStatement {
fn structure(self) -> Value {
let Self {
expr,
what,
cond,
fetch,
..
} = self;
let mut acc = Object::default();
acc.insert("expr".to_string(), expr.structure());
acc.insert("what".to_string(), what.structure());
if let Some(cond) = cond {
acc.insert("cond".to_string(), cond.structure());
}
if let Some(fetch) = fetch {
acc.insert("fetch".to_string(), fetch.structure());
}
Value::Object(acc)
}
}