surrealdb_core/sql/statements/
live.rs

1use crate::ctx::Context;
2use crate::dbs::Options;
3use crate::doc::CursorDoc;
4use crate::err::Error;
5use crate::iam::Auth;
6use crate::kvs::Live;
7use crate::sql::statements::info::InfoStructure;
8use crate::sql::{Cond, Fetchs, Fields, Uuid, Value};
9
10use reblessive::tree::Stk;
11use revision::revisioned;
12use serde::{Deserialize, Serialize};
13use std::fmt;
14
15#[revisioned(revision = 1)]
16#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)]
17#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
18#[non_exhaustive]
19pub struct LiveStatement {
20	pub id: Uuid,
21	pub node: Uuid,
22	pub expr: Fields,
23	pub what: Value,
24	pub cond: Option<Cond>,
25	pub fetch: Option<Fetchs>,
26	// When a live query is created, we must also store the
27	// authenticated session of the user who made the query,
28	// so we can check it later when sending notifications.
29	// This is optional as it is only set by the database
30	// runtime when storing the live query to storage.
31	pub(crate) auth: Option<Auth>,
32	// When a live query is created, we must also store the
33	// authenticated session of the user who made the query,
34	// so we can check it later when sending notifications.
35	// This is optional as it is only set by the database
36	// runtime when storing the live query to storage.
37	pub(crate) session: Option<Value>,
38}
39
40impl LiveStatement {
41	pub fn new(expr: Fields) -> Self {
42		LiveStatement {
43			id: Uuid::new_v4(),
44			node: Uuid::new_v4(),
45			expr,
46			..Default::default()
47		}
48	}
49
50	pub fn new_from_what_expr(expr: Fields, what: Value) -> Self {
51		LiveStatement {
52			id: Uuid::new_v4(),
53			node: Uuid::new_v4(),
54			what,
55			expr,
56			..Default::default()
57		}
58	}
59
60	/// Creates a live statement from parts that can be set during a query.
61	pub(crate) fn from_source_parts(
62		expr: Fields,
63		what: Value,
64		cond: Option<Cond>,
65		fetch: Option<Fetchs>,
66	) -> Self {
67		LiveStatement {
68			id: Uuid::new_v4(),
69			node: Uuid::new_v4(),
70			expr,
71			what,
72			cond,
73			fetch,
74			..Default::default()
75		}
76	}
77
78	/// Process this type returning a computed simple Value
79	pub(crate) async fn compute(
80		&self,
81		stk: &mut Stk,
82		ctx: &Context,
83		opt: &Options,
84		doc: Option<&CursorDoc>,
85	) -> Result<Value, Error> {
86		// Is realtime enabled?
87		opt.realtime()?;
88		// Valid options?
89		opt.valid_for_db()?;
90		// Get the Node ID
91		let nid = opt.id()?;
92		// Check that auth has been set
93		let mut stm = LiveStatement {
94			// Use the current session authentication
95			// for when we store the LIVE Statement
96			auth: Some(opt.auth.as_ref().clone()),
97			// Use the current session authentication
98			// for when we store the LIVE Statement
99			session: ctx.value("session").cloned(),
100			// Clone the rest of the original fields
101			// from the LIVE statement to the new one
102			..self.clone()
103		};
104		// Get the id
105		let id = stm.id.0;
106		// Process the live query table
107		match stm.what.compute(stk, ctx, opt, doc).await? {
108			Value::Table(tb) => {
109				// Store the current Node ID
110				stm.node = nid.into();
111				// Get the NS and DB
112				let (ns, db) = opt.ns_db()?;
113				// Store the live info
114				let lq = Live {
115					ns: ns.to_string(),
116					db: db.to_string(),
117					tb: tb.to_string(),
118				};
119				// Get the transaction
120				let txn = ctx.tx();
121				// Ensure that the table definition exists
122				txn.ensure_ns_db_tb(ns, db, &tb, opt.strict).await?;
123				// Insert the node live query
124				let key = crate::key::node::lq::new(nid, id);
125				txn.replace(key, revision::to_vec(&lq)?).await?;
126				// Insert the table live query
127				let key = crate::key::table::lq::new(ns, db, &tb, id);
128				txn.replace(key, revision::to_vec(&stm)?).await?;
129				// Refresh the table cache for lives
130				if let Some(cache) = ctx.get_cache() {
131					cache.new_live_queries_version(ns, db, &tb);
132				}
133				// Clear the cache
134				txn.clear();
135			}
136			v => {
137				return Err(Error::LiveStatement {
138					value: v.to_string(),
139				});
140			}
141		};
142		// Return the query id
143		Ok(id.into())
144	}
145}
146
147impl fmt::Display for LiveStatement {
148	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
149		write!(f, "LIVE SELECT {} FROM {}", self.expr, self.what)?;
150		if let Some(ref v) = self.cond {
151			write!(f, " {v}")?
152		}
153		if let Some(ref v) = self.fetch {
154			write!(f, " {v}")?
155		}
156		Ok(())
157	}
158}
159
160impl InfoStructure for LiveStatement {
161	fn structure(self) -> Value {
162		Value::from(map! {
163			"expr".to_string() => self.expr.structure(),
164			"what".to_string() => self.what.structure(),
165			"cond".to_string(), if let Some(v) = self.cond => v.structure(),
166			"fetch".to_string(), if let Some(v) = self.fetch => v.structure(),
167		})
168	}
169}
170
171#[cfg(test)]
172mod tests {
173	use crate::dbs::{Action, Capabilities, Notification, Session};
174	use crate::kvs::Datastore;
175	use crate::kvs::LockType::Optimistic;
176	use crate::kvs::TransactionType::Write;
177	use crate::sql::Thing;
178	use crate::sql::Value;
179	use crate::syn::Parse;
180
181	pub async fn new_ds() -> Result<Datastore, crate::err::Error> {
182		Ok(Datastore::new("memory")
183			.await?
184			.with_capabilities(Capabilities::all())
185			.with_notifications())
186	}
187
188	#[tokio::test]
189	async fn test_table_definition_is_created_for_live_query() {
190		let dbs = new_ds().await.unwrap().with_notifications();
191		let (ns, db, tb) = ("test", "test", "person");
192		let ses = Session::owner().with_ns(ns).with_db(db).with_rt(true);
193
194		// Create a new transaction and verify that there are no tables defined.
195		let tx = dbs.transaction(Write, Optimistic).await.unwrap();
196		let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
197		assert!(table_occurrences.is_empty());
198		tx.cancel().await.unwrap();
199
200		// Initiate a live query statement
201		let lq_stmt = format!("LIVE SELECT * FROM {}", tb);
202		let live_query_response = &mut dbs.execute(&lq_stmt, &ses, None).await.unwrap();
203
204		let live_id = live_query_response.remove(0).result.unwrap();
205		let live_id = match live_id {
206			Value::Uuid(id) => id,
207			_ => panic!("expected uuid"),
208		};
209
210		// Verify that the table definition has been created.
211		let tx = dbs.transaction(Write, Optimistic).await.unwrap();
212		let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
213		assert_eq!(table_occurrences.len(), 1);
214		assert_eq!(table_occurrences[0].name.0, tb);
215		tx.cancel().await.unwrap();
216
217		// Initiate a Create record
218		let create_statement = format!("CREATE {tb}:test_true SET condition = true");
219		let create_response = &mut dbs.execute(&create_statement, &ses, None).await.unwrap();
220		assert_eq!(create_response.len(), 1);
221		let expected_record = Value::parse(&format!(
222			"[{{
223				id: {tb}:test_true,
224				condition: true,
225			}}]"
226		));
227
228		let tmp = create_response.remove(0).result.unwrap();
229		assert_eq!(tmp, expected_record);
230
231		// Create a new transaction to verify that the same table was used.
232		let tx = dbs.transaction(Write, Optimistic).await.unwrap();
233		let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
234		assert_eq!(table_occurrences.len(), 1);
235		assert_eq!(table_occurrences[0].name.0, tb);
236		tx.cancel().await.unwrap();
237
238		// Validate notification
239		let notifications = dbs.notifications().expect("expected notifications");
240		let notification = notifications.recv().await.unwrap();
241		assert_eq!(
242			notification,
243			Notification::new(
244				live_id,
245				Action::Create,
246				Value::Thing(Thing::from((tb, "test_true"))),
247				Value::parse(&format!(
248					"{{
249						id: {tb}:test_true,
250						condition: true,
251					}}"
252				),),
253			)
254		);
255	}
256
257	#[tokio::test]
258	async fn test_table_exists_for_live_query() {
259		let dbs = new_ds().await.unwrap().with_notifications();
260		let (ns, db, tb) = ("test", "test", "person");
261		let ses = Session::owner().with_ns(ns).with_db(db).with_rt(true);
262
263		// Create a new transaction and verify that there are no tables defined.
264		let tx = dbs.transaction(Write, Optimistic).await.unwrap();
265		let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
266		assert!(table_occurrences.is_empty());
267		tx.cancel().await.unwrap();
268
269		// Initiate a Create record
270		let create_statement = format!("CREATE {}:test_true SET condition = true", tb);
271		dbs.execute(&create_statement, &ses, None).await.unwrap();
272
273		// Create a new transaction and confirm that a new table is created.
274		let tx = dbs.transaction(Write, Optimistic).await.unwrap();
275		let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
276		assert_eq!(table_occurrences.len(), 1);
277		assert_eq!(table_occurrences[0].name.0, tb);
278		tx.cancel().await.unwrap();
279
280		// Initiate a live query statement
281		let lq_stmt = format!("LIVE SELECT * FROM {}", tb);
282		dbs.execute(&lq_stmt, &ses, None).await.unwrap();
283
284		// Verify that the old table definition was used.
285		let tx = dbs.transaction(Write, Optimistic).await.unwrap();
286		let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
287		assert_eq!(table_occurrences.len(), 1);
288		assert_eq!(table_occurrences[0].name.0, tb);
289		tx.cancel().await.unwrap();
290	}
291}