surrealdb_core/sql/statements/
live.rs1use crate::ctx::Context;
2use crate::dbs::Options;
3use crate::doc::CursorDoc;
4use crate::err::Error;
5use crate::iam::Auth;
6use crate::kvs::Live;
7use crate::sql::statements::info::InfoStructure;
8use crate::sql::{Cond, Fetchs, Fields, Uuid, Value};
9
10use reblessive::tree::Stk;
11use revision::revisioned;
12use serde::{Deserialize, Serialize};
13use std::fmt;
14
15#[revisioned(revision = 1)]
16#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)]
17#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
18#[non_exhaustive]
19pub struct LiveStatement {
20 pub id: Uuid,
21 pub node: Uuid,
22 pub expr: Fields,
23 pub what: Value,
24 pub cond: Option<Cond>,
25 pub fetch: Option<Fetchs>,
26 pub(crate) auth: Option<Auth>,
32 pub(crate) session: Option<Value>,
38}
39
40impl LiveStatement {
41 pub fn new(expr: Fields) -> Self {
42 LiveStatement {
43 id: Uuid::new_v4(),
44 node: Uuid::new_v4(),
45 expr,
46 ..Default::default()
47 }
48 }
49
50 pub fn new_from_what_expr(expr: Fields, what: Value) -> Self {
51 LiveStatement {
52 id: Uuid::new_v4(),
53 node: Uuid::new_v4(),
54 what,
55 expr,
56 ..Default::default()
57 }
58 }
59
60 pub(crate) fn from_source_parts(
62 expr: Fields,
63 what: Value,
64 cond: Option<Cond>,
65 fetch: Option<Fetchs>,
66 ) -> Self {
67 LiveStatement {
68 id: Uuid::new_v4(),
69 node: Uuid::new_v4(),
70 expr,
71 what,
72 cond,
73 fetch,
74 ..Default::default()
75 }
76 }
77
78 pub(crate) async fn compute(
80 &self,
81 stk: &mut Stk,
82 ctx: &Context,
83 opt: &Options,
84 doc: Option<&CursorDoc>,
85 ) -> Result<Value, Error> {
86 opt.realtime()?;
88 opt.valid_for_db()?;
90 let nid = opt.id()?;
92 let mut stm = LiveStatement {
94 auth: Some(opt.auth.as_ref().clone()),
97 session: ctx.value("session").cloned(),
100 ..self.clone()
103 };
104 let id = stm.id.0;
106 match stm.what.compute(stk, ctx, opt, doc).await? {
108 Value::Table(tb) => {
109 stm.node = nid.into();
111 let (ns, db) = opt.ns_db()?;
113 let lq = Live {
115 ns: ns.to_string(),
116 db: db.to_string(),
117 tb: tb.to_string(),
118 };
119 let txn = ctx.tx();
121 txn.ensure_ns_db_tb(ns, db, &tb, opt.strict).await?;
123 let key = crate::key::node::lq::new(nid, id);
125 txn.replace(key, revision::to_vec(&lq)?).await?;
126 let key = crate::key::table::lq::new(ns, db, &tb, id);
128 txn.replace(key, revision::to_vec(&stm)?).await?;
129 if let Some(cache) = ctx.get_cache() {
131 cache.new_live_queries_version(ns, db, &tb);
132 }
133 txn.clear();
135 }
136 v => {
137 return Err(Error::LiveStatement {
138 value: v.to_string(),
139 });
140 }
141 };
142 Ok(id.into())
144 }
145}
146
147impl fmt::Display for LiveStatement {
148 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
149 write!(f, "LIVE SELECT {} FROM {}", self.expr, self.what)?;
150 if let Some(ref v) = self.cond {
151 write!(f, " {v}")?
152 }
153 if let Some(ref v) = self.fetch {
154 write!(f, " {v}")?
155 }
156 Ok(())
157 }
158}
159
160impl InfoStructure for LiveStatement {
161 fn structure(self) -> Value {
162 Value::from(map! {
163 "expr".to_string() => self.expr.structure(),
164 "what".to_string() => self.what.structure(),
165 "cond".to_string(), if let Some(v) = self.cond => v.structure(),
166 "fetch".to_string(), if let Some(v) = self.fetch => v.structure(),
167 })
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use crate::dbs::{Action, Capabilities, Notification, Session};
174 use crate::kvs::Datastore;
175 use crate::kvs::LockType::Optimistic;
176 use crate::kvs::TransactionType::Write;
177 use crate::sql::Thing;
178 use crate::sql::Value;
179 use crate::syn::Parse;
180
181 pub async fn new_ds() -> Result<Datastore, crate::err::Error> {
182 Ok(Datastore::new("memory")
183 .await?
184 .with_capabilities(Capabilities::all())
185 .with_notifications())
186 }
187
188 #[tokio::test]
189 async fn test_table_definition_is_created_for_live_query() {
190 let dbs = new_ds().await.unwrap().with_notifications();
191 let (ns, db, tb) = ("test", "test", "person");
192 let ses = Session::owner().with_ns(ns).with_db(db).with_rt(true);
193
194 let tx = dbs.transaction(Write, Optimistic).await.unwrap();
196 let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
197 assert!(table_occurrences.is_empty());
198 tx.cancel().await.unwrap();
199
200 let lq_stmt = format!("LIVE SELECT * FROM {}", tb);
202 let live_query_response = &mut dbs.execute(&lq_stmt, &ses, None).await.unwrap();
203
204 let live_id = live_query_response.remove(0).result.unwrap();
205 let live_id = match live_id {
206 Value::Uuid(id) => id,
207 _ => panic!("expected uuid"),
208 };
209
210 let tx = dbs.transaction(Write, Optimistic).await.unwrap();
212 let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
213 assert_eq!(table_occurrences.len(), 1);
214 assert_eq!(table_occurrences[0].name.0, tb);
215 tx.cancel().await.unwrap();
216
217 let create_statement = format!("CREATE {tb}:test_true SET condition = true");
219 let create_response = &mut dbs.execute(&create_statement, &ses, None).await.unwrap();
220 assert_eq!(create_response.len(), 1);
221 let expected_record = Value::parse(&format!(
222 "[{{
223 id: {tb}:test_true,
224 condition: true,
225 }}]"
226 ));
227
228 let tmp = create_response.remove(0).result.unwrap();
229 assert_eq!(tmp, expected_record);
230
231 let tx = dbs.transaction(Write, Optimistic).await.unwrap();
233 let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
234 assert_eq!(table_occurrences.len(), 1);
235 assert_eq!(table_occurrences[0].name.0, tb);
236 tx.cancel().await.unwrap();
237
238 let notifications = dbs.notifications().expect("expected notifications");
240 let notification = notifications.recv().await.unwrap();
241 assert_eq!(
242 notification,
243 Notification::new(
244 live_id,
245 Action::Create,
246 Value::Thing(Thing::from((tb, "test_true"))),
247 Value::parse(&format!(
248 "{{
249 id: {tb}:test_true,
250 condition: true,
251 }}"
252 ),),
253 )
254 );
255 }
256
257 #[tokio::test]
258 async fn test_table_exists_for_live_query() {
259 let dbs = new_ds().await.unwrap().with_notifications();
260 let (ns, db, tb) = ("test", "test", "person");
261 let ses = Session::owner().with_ns(ns).with_db(db).with_rt(true);
262
263 let tx = dbs.transaction(Write, Optimistic).await.unwrap();
265 let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
266 assert!(table_occurrences.is_empty());
267 tx.cancel().await.unwrap();
268
269 let create_statement = format!("CREATE {}:test_true SET condition = true", tb);
271 dbs.execute(&create_statement, &ses, None).await.unwrap();
272
273 let tx = dbs.transaction(Write, Optimistic).await.unwrap();
275 let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
276 assert_eq!(table_occurrences.len(), 1);
277 assert_eq!(table_occurrences[0].name.0, tb);
278 tx.cancel().await.unwrap();
279
280 let lq_stmt = format!("LIVE SELECT * FROM {}", tb);
282 dbs.execute(&lq_stmt, &ses, None).await.unwrap();
283
284 let tx = dbs.transaction(Write, Optimistic).await.unwrap();
286 let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
287 assert_eq!(table_occurrences.len(), 1);
288 assert_eq!(table_occurrences[0].name.0, tb);
289 tx.cancel().await.unwrap();
290 }
291}