surrealdb_core/sql/statements/define/
table.rs

1use super::DefineFieldStatement;
2use crate::ctx::Context;
3use crate::dbs::{Force, Options};
4use crate::doc::CursorDoc;
5use crate::err::Error;
6use crate::iam::{Action, ResourceKind};
7use crate::kvs::Transaction;
8use crate::sql::fmt::{is_pretty, pretty_indent};
9use crate::sql::paths::{IN, OUT};
10use crate::sql::statements::info::InfoStructure;
11use crate::sql::{
12	changefeed::ChangeFeed, statements::UpdateStatement, Base, Ident, Output, Permissions, Strand,
13	Value, Values, View,
14};
15use crate::sql::{Idiom, Kind, TableType};
16
17use reblessive::tree::Stk;
18use revision::revisioned;
19use revision::Error as RevisionError;
20use serde::{Deserialize, Serialize};
21use std::fmt::{self, Display, Write};
22use std::sync::Arc;
23use uuid::Uuid;
24
25#[revisioned(revision = 6)]
26#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)]
27#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
28#[non_exhaustive]
29pub struct DefineTableStatement {
30	pub id: Option<u32>,
31	pub name: Ident,
32	pub drop: bool,
33	pub full: bool,
34	pub view: Option<View>,
35	pub permissions: Permissions,
36	pub changefeed: Option<ChangeFeed>,
37	pub comment: Option<Strand>,
38	#[revision(start = 2)]
39	pub if_not_exists: bool,
40	#[revision(start = 3)]
41	pub kind: TableType,
42	/// Should we overwrite the field definition if it already exists
43	#[revision(start = 4)]
44	pub overwrite: bool,
45	/// The last time that a DEFINE FIELD was added to this table
46	#[revision(start = 5)]
47	pub cache_fields_ts: Uuid,
48	/// The last time that a DEFINE EVENT was added to this table
49	#[revision(start = 5)]
50	pub cache_events_ts: Uuid,
51	/// The last time that a DEFINE TABLE was added to this table
52	#[revision(start = 5)]
53	pub cache_tables_ts: Uuid,
54	/// The last time that a DEFINE INDEX was added to this table
55	#[revision(start = 5)]
56	pub cache_indexes_ts: Uuid,
57	/// The last time that a LIVE query was added to this table
58	#[revision(start = 5, end = 6, convert_fn = "convert_cache_ts")]
59	pub cache_lives_ts: Uuid,
60}
61
62impl DefineTableStatement {
63	pub(crate) async fn compute(
64		&self,
65		stk: &mut Stk,
66		ctx: &Context,
67		opt: &Options,
68		doc: Option<&CursorDoc>,
69	) -> Result<Value, Error> {
70		// Allowed to run?
71		opt.is_allowed(Action::Edit, ResourceKind::Table, &Base::Db)?;
72		// Get the NS and DB
73		let (ns, db) = opt.ns_db()?;
74		// Fetch the transaction
75		let txn = ctx.tx();
76		// Check if the definition exists
77		if txn.get_tb(ns, db, &self.name).await.is_ok() {
78			if self.if_not_exists {
79				return Ok(Value::None);
80			} else if !self.overwrite {
81				return Err(Error::TbAlreadyExists {
82					name: self.name.to_string(),
83				});
84			}
85		}
86		// Process the statement
87		let key = crate::key::database::tb::new(ns, db, &self.name);
88		let nsv = txn.get_or_add_ns(ns, opt.strict).await?;
89		let dbv = txn.get_or_add_db(ns, db, opt.strict).await?;
90		let mut dt = DefineTableStatement {
91			id: if self.id.is_none() && nsv.id.is_some() && dbv.id.is_some() {
92				Some(txn.lock().await.get_next_tb_id(nsv.id.unwrap(), dbv.id.unwrap()).await?)
93			} else {
94				None
95			},
96			// Don't persist the `IF NOT EXISTS` clause to schema
97			if_not_exists: false,
98			overwrite: false,
99			..self.clone()
100		};
101		// Add table relational fields
102		Self::add_in_out_fields(&txn, ns, db, &mut dt).await?;
103		// Set the table definition
104		txn.set(key, revision::to_vec(&dt)?, None).await?;
105		// Clear the cache
106		if let Some(cache) = ctx.get_cache() {
107			cache.clear_tb(ns, db, &self.name);
108		}
109		// Clear the cache
110		txn.clear();
111		// Record definition change
112		if dt.changefeed.is_some() {
113			txn.lock().await.record_table_change(ns, db, &self.name, &dt);
114		}
115		// Check if table is a view
116		if let Some(view) = &self.view {
117			// Force queries to run
118			let opt = &opt.new_with_force(Force::Table(Arc::new([dt])));
119			// Remove the table data
120			let key = crate::key::table::all::new(ns, db, &self.name);
121			txn.delp(key).await?;
122			// Process each foreign table
123			for ft in view.what.0.iter() {
124				// Save the view config
125				let key = crate::key::table::ft::new(ns, db, ft, &self.name);
126				txn.set(key, revision::to_vec(self)?, None).await?;
127				// Refresh the table cache
128				let key = crate::key::database::tb::new(ns, db, ft);
129				let tb = txn.get_tb(ns, db, ft).await?;
130				txn.set(
131					key,
132					revision::to_vec(&DefineTableStatement {
133						cache_tables_ts: Uuid::now_v7(),
134						..tb.as_ref().clone()
135					})?,
136					None,
137				)
138				.await?;
139				// Clear the cache
140				if let Some(cache) = ctx.get_cache() {
141					cache.clear_tb(ns, db, ft);
142				}
143				// Clear the cache
144				txn.clear();
145				// Process the view data
146				let stm = UpdateStatement {
147					what: Values(vec![Value::Table(ft.clone())]),
148					output: Some(Output::None),
149					..UpdateStatement::default()
150				};
151				stm.compute(stk, ctx, opt, doc).await?;
152			}
153		}
154		// Clear the cache
155		if let Some(cache) = ctx.get_cache() {
156			cache.clear_tb(ns, db, &self.name);
157		}
158		// Clear the cache
159		txn.clear();
160		// Ok all good
161		Ok(Value::None)
162	}
163
164	fn convert_cache_ts(&self, _revision: u16, _value: Uuid) -> Result<(), RevisionError> {
165		Ok(())
166	}
167}
168
169impl DefineTableStatement {
170	/// Checks if this is a TYPE RELATION table
171	pub fn is_relation(&self) -> bool {
172		matches!(self.kind, TableType::Relation(_))
173	}
174	/// Checks if this table allows graph edges / relations
175	pub fn allows_relation(&self) -> bool {
176		matches!(self.kind, TableType::Relation(_) | TableType::Any)
177	}
178	/// Checks if this table allows normal records / documents
179	pub fn allows_normal(&self) -> bool {
180		matches!(self.kind, TableType::Normal | TableType::Any)
181	}
182	/// Used to add relational fields to existing table records
183	pub async fn add_in_out_fields(
184		txn: &Transaction,
185		ns: &str,
186		db: &str,
187		tb: &mut DefineTableStatement,
188	) -> Result<(), Error> {
189		// Add table relational fields
190		if let TableType::Relation(rel) = &tb.kind {
191			// Set the `in` field as a DEFINE FIELD definition
192			{
193				let key = crate::key::table::fd::new(ns, db, &tb.name, "in");
194				let val = rel.from.clone().unwrap_or(Kind::Record(vec![]));
195				txn.set(
196					key,
197					revision::to_vec(&DefineFieldStatement {
198						name: Idiom::from(IN.to_vec()),
199						what: tb.name.to_owned(),
200						kind: Some(val),
201						..Default::default()
202					})?,
203					None,
204				)
205				.await?;
206			}
207			// Set the `out` field as a DEFINE FIELD definition
208			{
209				let key = crate::key::table::fd::new(ns, db, &tb.name, "out");
210				let val = rel.to.clone().unwrap_or(Kind::Record(vec![]));
211				txn.set(
212					key,
213					revision::to_vec(&DefineFieldStatement {
214						name: Idiom::from(OUT.to_vec()),
215						what: tb.name.to_owned(),
216						kind: Some(val),
217						..Default::default()
218					})?,
219					None,
220				)
221				.await?;
222			}
223			// Refresh the table cache for the fields
224			tb.cache_fields_ts = Uuid::now_v7();
225		}
226		Ok(())
227	}
228}
229
230impl Display for DefineTableStatement {
231	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
232		write!(f, "DEFINE TABLE")?;
233		if self.if_not_exists {
234			write!(f, " IF NOT EXISTS")?
235		}
236		if self.overwrite {
237			write!(f, " OVERWRITE")?
238		}
239		write!(f, " {}", self.name)?;
240		write!(f, " TYPE")?;
241		match &self.kind {
242			TableType::Normal => {
243				f.write_str(" NORMAL")?;
244			}
245			TableType::Relation(rel) => {
246				f.write_str(" RELATION")?;
247				if let Some(Kind::Record(kind)) = &rel.from {
248					write!(
249						f,
250						" IN {}",
251						kind.iter().map(|t| t.0.as_str()).collect::<Vec<_>>().join(" | ")
252					)?;
253				}
254				if let Some(Kind::Record(kind)) = &rel.to {
255					write!(
256						f,
257						" OUT {}",
258						kind.iter().map(|t| t.0.as_str()).collect::<Vec<_>>().join(" | ")
259					)?;
260				}
261				if rel.enforced {
262					write!(f, " ENFORCED")?;
263				}
264			}
265			TableType::Any => {
266				f.write_str(" ANY")?;
267			}
268		}
269		if self.drop {
270			f.write_str(" DROP")?;
271		}
272		f.write_str(if self.full {
273			" SCHEMAFULL"
274		} else {
275			" SCHEMALESS"
276		})?;
277		if let Some(ref v) = self.comment {
278			write!(f, " COMMENT {v}")?
279		}
280		if let Some(ref v) = self.view {
281			write!(f, " {v}")?
282		}
283		if let Some(ref v) = self.changefeed {
284			write!(f, " {v}")?;
285		}
286		let _indent = if is_pretty() {
287			Some(pretty_indent())
288		} else {
289			f.write_char(' ')?;
290			None
291		};
292		write!(f, "{}", self.permissions)?;
293		Ok(())
294	}
295}
296
297impl InfoStructure for DefineTableStatement {
298	fn structure(self) -> Value {
299		Value::from(map! {
300			"name".to_string() => self.name.structure(),
301			"drop".to_string() => self.drop.into(),
302			"full".to_string() => self.full.into(),
303			"kind".to_string() => self.kind.structure(),
304			"view".to_string(), if let Some(v) = self.view => v.structure(),
305			"changefeed".to_string(), if let Some(v) = self.changefeed => v.structure(),
306			"permissions".to_string() => self.permissions.structure(),
307			"comment".to_string(), if let Some(v) = self.comment => v.into(),
308		})
309	}
310}