surrealdb_core/sql/statements/define/
field.rs

1use crate::ctx::Context;
2use crate::dbs::capabilities::ExperimentalTarget;
3use crate::dbs::Options;
4use crate::doc::CursorDoc;
5use crate::err::Error;
6use crate::iam::{Action, ResourceKind};
7use crate::sql::fmt::{is_pretty, pretty_indent};
8use crate::sql::reference::Reference;
9use crate::sql::statements::info::InfoStructure;
10use crate::sql::statements::DefineTableStatement;
11use crate::sql::{Base, Ident, Idiom, Kind, Permissions, Strand, Value};
12use crate::sql::{Literal, Part};
13use crate::sql::{Relation, TableType};
14
15use revision::revisioned;
16use serde::{Deserialize, Serialize};
17use std::fmt::{self, Display, Write};
18use uuid::Uuid;
19
20#[revisioned(revision = 6)]
21#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)]
22#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
23#[non_exhaustive]
24pub struct DefineFieldStatement {
25	pub name: Idiom,
26	pub what: Ident,
27	pub flex: bool,
28	pub kind: Option<Kind>,
29	#[revision(start = 2)]
30	pub readonly: bool,
31	pub value: Option<Value>,
32	pub assert: Option<Value>,
33	pub default: Option<Value>,
34	pub permissions: Permissions,
35	pub comment: Option<Strand>,
36	#[revision(start = 3)]
37	pub if_not_exists: bool,
38	#[revision(start = 4)]
39	pub overwrite: bool,
40	#[revision(start = 5)]
41	pub reference: Option<Reference>,
42	#[revision(start = 6)]
43	pub default_always: bool,
44}
45
46impl DefineFieldStatement {
47	/// Process this type returning a computed simple Value
48	pub(crate) async fn compute(
49		&self,
50		ctx: &Context,
51		opt: &Options,
52		_doc: Option<&CursorDoc>,
53	) -> Result<Value, Error> {
54		// Allowed to run?
55		opt.is_allowed(Action::Edit, ResourceKind::Field, &Base::Db)?;
56		// Validate reference options
57		self.validate_reference_options(ctx)?;
58		// Correct reference type
59		let kind = if let Some(kind) = self.correct_reference_type(ctx, opt).await? {
60			Some(kind)
61		} else {
62			self.kind.clone()
63		};
64		// Disallow mismatched types
65		self.disallow_mismatched_types(ctx, opt).await?;
66		// Get the NS and DB
67		let (ns, db) = opt.ns_db()?;
68		// Fetch the transaction
69		let txn = ctx.tx();
70		// Get the name of the field
71		let fd = self.name.to_string();
72		// Check if the definition exists
73		if txn.get_tb_field(ns, db, &self.what, &fd).await.is_ok() {
74			if self.if_not_exists {
75				return Ok(Value::None);
76			} else if !self.overwrite {
77				return Err(Error::FdAlreadyExists {
78					name: fd,
79				});
80			}
81		}
82		// Process the statement
83		let key = crate::key::table::fd::new(ns, db, &self.what, &fd);
84		txn.get_or_add_ns(ns, opt.strict).await?;
85		txn.get_or_add_db(ns, db, opt.strict).await?;
86		txn.get_or_add_tb(ns, db, &self.what, opt.strict).await?;
87		txn.set(
88			key,
89			revision::to_vec(&DefineFieldStatement {
90				// Don't persist the `IF NOT EXISTS` clause to schema
91				if_not_exists: false,
92				overwrite: false,
93				kind,
94				..self.clone()
95			})?,
96			None,
97		)
98		.await?;
99		// Refresh the table cache
100		let key = crate::key::database::tb::new(ns, db, &self.what);
101		let tb = txn.get_tb(ns, db, &self.what).await?;
102		txn.set(
103			key,
104			revision::to_vec(&DefineTableStatement {
105				cache_fields_ts: Uuid::now_v7(),
106				..tb.as_ref().clone()
107			})?,
108			None,
109		)
110		.await?;
111		// Clear the cache
112		if let Some(cache) = ctx.get_cache() {
113			cache.clear_tb(ns, db, &self.what);
114		}
115		// Clear the cache
116		txn.clear();
117		// Find all existing field definitions
118		let fields = txn.all_tb_fields(ns, db, &self.what, None).await.ok();
119		// Process possible recursive_definitions
120		if let Some(mut cur_kind) = self.kind.as_ref().and_then(|x| x.inner_kind()) {
121			let mut name = self.name.clone();
122			loop {
123				// Check if the subtype is an `any` type
124				if let Kind::Any = cur_kind {
125					// There is no need to add a subtype
126					// field definition if the type is
127					// just specified as an `array`. This
128					// is because the following query:
129					//  DEFINE FIELD foo ON bar TYPE array;
130					// already implies that the immediate
131					// subtype is an any:
132					//  DEFINE FIELD foo[*] ON bar TYPE any;
133					// so we skip the subtype field.
134					break;
135				}
136				// Get the kind of this sub field
137				let new_kind = cur_kind.inner_kind();
138				// Add a new subtype
139				name.0.push(Part::All);
140				// Get the field name
141				let fd = name.to_string();
142				// Set the subtype `DEFINE FIELD` definition
143				let key = crate::key::table::fd::new(ns, db, &self.what, &fd);
144				let val = if let Some(existing) =
145					fields.as_ref().and_then(|x| x.iter().find(|x| x.name == name))
146				{
147					DefineFieldStatement {
148						kind: Some(cur_kind),
149						reference: self.reference.clone(),
150						if_not_exists: false,
151						overwrite: false,
152						..existing.clone()
153					}
154				} else {
155					DefineFieldStatement {
156						name: name.clone(),
157						what: self.what.clone(),
158						flex: self.flex,
159						kind: Some(cur_kind),
160						reference: self.reference.clone(),
161						..Default::default()
162					}
163				};
164				txn.set(key, revision::to_vec(&val)?, None).await?;
165				// Process to any sub field
166				if let Some(new_kind) = new_kind {
167					cur_kind = new_kind;
168				} else {
169					break;
170				}
171			}
172		}
173		// If this is an `in` field then check relation definitions
174		if fd.as_str() == "in" {
175			// Get the table definition that this field belongs to
176			let tb = txn.get_tb(ns, db, &self.what).await?;
177			// The table is marked as TYPE RELATION
178			if let TableType::Relation(ref relation) = tb.kind {
179				// Check if a field TYPE has been specified
180				if let Some(kind) = self.kind.as_ref() {
181					// The `in` field must be a record type
182					if !kind.is_record() {
183						return Err(Error::Thrown(
184							"in field on a relation must be a record".into(),
185						));
186					}
187					// Add the TYPE to the DEFINE TABLE statement
188					if relation.from.as_ref() != self.kind.as_ref() {
189						let key = crate::key::database::tb::new(ns, db, &self.what);
190						let val = DefineTableStatement {
191							cache_fields_ts: Uuid::now_v7(),
192							kind: TableType::Relation(Relation {
193								from: self.kind.to_owned(),
194								..relation.to_owned()
195							}),
196							..tb.as_ref().to_owned()
197						};
198						txn.set(key, revision::to_vec(&val)?, None).await?;
199						// Clear the cache
200						if let Some(cache) = ctx.get_cache() {
201							cache.clear_tb(ns, db, &self.what);
202						}
203						// Clear the cache
204						txn.clear();
205					}
206				}
207			}
208		}
209		// If this is an `out` field then check relation definitions
210		if fd.as_str() == "out" {
211			// Get the table definition that this field belongs to
212			let tb = txn.get_tb(ns, db, &self.what).await?;
213			// The table is marked as TYPE RELATION
214			if let TableType::Relation(ref relation) = tb.kind {
215				// Check if a field TYPE has been specified
216				if let Some(kind) = self.kind.as_ref() {
217					// The `out` field must be a record type
218					if !kind.is_record() {
219						return Err(Error::Thrown(
220							"out field on a relation must be a record".into(),
221						));
222					}
223					// Add the TYPE to the DEFINE TABLE statement
224					if relation.from.as_ref() != self.kind.as_ref() {
225						let key = crate::key::database::tb::new(ns, db, &self.what);
226						let val = DefineTableStatement {
227							cache_fields_ts: Uuid::now_v7(),
228							kind: TableType::Relation(Relation {
229								to: self.kind.to_owned(),
230								..relation.to_owned()
231							}),
232							..tb.as_ref().to_owned()
233						};
234						txn.set(key, revision::to_vec(&val)?, None).await?;
235						// Clear the cache
236						if let Some(cache) = ctx.get_cache() {
237							cache.clear_tb(ns, db, &self.what);
238						}
239						// Clear the cache
240						txn.clear();
241					}
242				}
243			}
244		}
245		// Clear the cache
246		txn.clear();
247		// Ok all good
248		Ok(Value::None)
249	}
250
251	fn validate_reference_options(&self, ctx: &Context) -> Result<(), Error> {
252		if !ctx.get_capabilities().allows_experimental(&ExperimentalTarget::RecordReferences) {
253			return Ok(());
254		}
255
256		if let Some(kind) = &self.kind {
257			let kinds = match kind {
258				Kind::Either(kinds) => kinds,
259				kind => &vec![kind.to_owned()],
260			};
261
262			// Check if any of the kinds are references
263			if kinds.iter().any(|k| matches!(k, Kind::References(_, _))) {
264				// If any of the kinds are references, all of them must be
265				if !kinds.iter().all(|k| matches!(k, Kind::References(_, _))) {
266					return Err(Error::RefsMismatchingVariants);
267				}
268
269				// As the refs and dynrefs type essentially take over a field
270				// they are not allowed to be mixed with most other clauses
271				let typename = kind.to_string();
272
273				if self.reference.is_some() {
274					return Err(Error::RefsTypeConflict("REFERENCE".into(), typename));
275				}
276
277				if self.default.is_some() {
278					return Err(Error::RefsTypeConflict("DEFAULT".into(), typename));
279				}
280
281				if self.value.is_some() {
282					return Err(Error::RefsTypeConflict("VALUE".into(), typename));
283				}
284
285				if self.assert.is_some() {
286					return Err(Error::RefsTypeConflict("ASSERT".into(), typename));
287				}
288
289				if self.flex {
290					return Err(Error::RefsTypeConflict("FLEXIBLE".into(), typename));
291				}
292
293				if self.readonly {
294					return Err(Error::RefsTypeConflict("READONLY".into(), typename));
295				}
296			}
297
298			// If a reference is defined, the field must be a record
299			if self.reference.is_some() {
300				let kinds = match kind.non_optional() {
301					Kind::Either(kinds) => kinds,
302					Kind::Array(kind, _) | Kind::Set(kind, _) => match kind.as_ref() {
303						Kind::Either(kinds) => kinds,
304						kind => &vec![kind.to_owned()],
305					},
306					Kind::Literal(lit) => match lit {
307						Literal::Array(kinds) => kinds,
308						lit => &vec![Kind::Literal(lit.to_owned())],
309					},
310					kind => &vec![kind.to_owned()],
311				};
312
313				if !kinds.iter().all(|k| matches!(k, Kind::Record(_))) {
314					return Err(Error::ReferenceTypeConflict(kind.to_string()));
315				}
316			}
317		}
318
319		Ok(())
320	}
321
322	async fn correct_reference_type(
323		&self,
324		ctx: &Context,
325		opt: &Options,
326	) -> Result<Option<Kind>, Error> {
327		if !ctx.get_capabilities().allows_experimental(&ExperimentalTarget::RecordReferences) {
328			return Ok(None);
329		}
330
331		if let Some(Kind::References(Some(ft), Some(ff))) = &self.kind {
332			// Obtain the field definition
333			let (ns, db) = opt.ns_db()?;
334			let fd = match ctx.tx().get_tb_field(ns, db, &ft.to_string(), &ff.to_string()).await {
335				Ok(fd) => fd,
336				// If the field does not exist, there is nothing to correct
337				Err(Error::FdNotFound {
338					..
339				}) => return Ok(None),
340				Err(e) => return Err(e),
341			};
342
343			// Check if the field is an array-like value and thus "containing" references
344			let is_contained = if let Some(kind) = &fd.kind {
345				matches!(
346					kind.non_optional(),
347					Kind::Array(_, _) | Kind::Set(_, _) | Kind::Literal(Literal::Array(_))
348				)
349			} else {
350				false
351			};
352
353			// If the field is an array-like value, add the `.*` part
354			if is_contained {
355				let ff = ff.clone().push(Part::All);
356				return Ok(Some(Kind::References(Some(ft.clone()), Some(ff))));
357			}
358		}
359
360		Ok(None)
361	}
362
363	async fn disallow_mismatched_types(&self, ctx: &Context, opt: &Options) -> Result<(), Error> {
364		let (ns, db) = opt.ns_db()?;
365		let fds = ctx.tx().all_tb_fields(ns, db, &self.what, None).await?;
366
367		if let Some(self_kind) = &self.kind {
368			for fd in fds.iter() {
369				if self.name.starts_with(&fd.name) && self.name != fd.name {
370					if let Some(fd_kind) = &fd.kind {
371						let path = self.name[fd.name.len()..].to_vec();
372						if !fd_kind.allows_nested_kind(&path, self_kind) {
373							return Err(Error::MismatchedFieldTypes {
374								name: self.name.to_string(),
375								kind: self_kind.to_string(),
376								existing_name: fd.name.to_string(),
377								existing_kind: fd_kind.to_string(),
378							});
379						}
380					}
381				}
382			}
383		}
384
385		Ok(())
386	}
387}
388
389impl Display for DefineFieldStatement {
390	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
391		write!(f, "DEFINE FIELD")?;
392		if self.if_not_exists {
393			write!(f, " IF NOT EXISTS")?
394		}
395		if self.overwrite {
396			write!(f, " OVERWRITE")?
397		}
398		write!(f, " {} ON {}", self.name, self.what)?;
399		if self.flex {
400			write!(f, " FLEXIBLE")?
401		}
402		if let Some(ref v) = self.kind {
403			write!(f, " TYPE {v}")?
404		}
405		if let Some(ref v) = self.default {
406			write!(f, " DEFAULT")?;
407			if self.default_always {
408				write!(f, " ALWAYS")?
409			}
410
411			write!(f, " {v}")?
412		}
413		if self.readonly {
414			write!(f, " READONLY")?
415		}
416		if let Some(ref v) = self.value {
417			write!(f, " VALUE {v}")?
418		}
419		if let Some(ref v) = self.assert {
420			write!(f, " ASSERT {v}")?
421		}
422		if let Some(ref v) = self.reference {
423			write!(f, " REFERENCE {v}")?
424		}
425		if let Some(ref v) = self.comment {
426			write!(f, " COMMENT {v}")?
427		}
428		let _indent = if is_pretty() {
429			Some(pretty_indent())
430		} else {
431			f.write_char(' ')?;
432			None
433		};
434		// Alternate permissions display implementation ignores delete permission
435		// This display is used to show field permissions, where delete has no effect
436		// Displaying the permission could mislead users into thinking it has an effect
437		// Additionally, including the permission will cause a parsing error in 3.0.0
438		write!(f, "{:#}", self.permissions)?;
439		Ok(())
440	}
441}
442
443impl InfoStructure for DefineFieldStatement {
444	fn structure(self) -> Value {
445		Value::from(map! {
446			"name".to_string() => self.name.structure(),
447			"what".to_string() => self.what.structure(),
448			"flex".to_string() => self.flex.into(),
449			"kind".to_string(), if let Some(v) = self.kind => v.structure(),
450			"value".to_string(), if let Some(v) = self.value => v.structure(),
451			"assert".to_string(), if let Some(v) = self.assert => v.structure(),
452			"default".to_string(), if let Some(v) = self.default => v.structure(),
453			"reference".to_string(), if let Some(v) = self.reference => v.structure(),
454			"readonly".to_string() => self.readonly.into(),
455			"permissions".to_string() => self.permissions.structure(),
456			"comment".to_string(), if let Some(v) = self.comment => v.into(),
457		})
458	}
459}