surrealdb_core/sql/statements/define/
index.rs

1use crate::ctx::Context;
2use crate::dbs::{Force, Options};
3use crate::doc::CursorDoc;
4use crate::err::Error;
5use crate::iam::{Action, ResourceKind};
6use crate::sql::statements::info::InfoStructure;
7use crate::sql::statements::DefineTableStatement;
8use crate::sql::statements::UpdateStatement;
9use crate::sql::{Base, Ident, Idioms, Index, Output, Part, Strand, Value, Values};
10
11use reblessive::tree::Stk;
12use revision::revisioned;
13use serde::{Deserialize, Serialize};
14use std::fmt::{self, Display};
15use std::sync::Arc;
16use uuid::Uuid;
17
18#[revisioned(revision = 4)]
19#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)]
20#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
21#[non_exhaustive]
22pub struct DefineIndexStatement {
23	pub name: Ident,
24	pub what: Ident,
25	pub cols: Idioms,
26	pub index: Index,
27	pub comment: Option<Strand>,
28	#[revision(start = 2)]
29	pub if_not_exists: bool,
30	#[revision(start = 3)]
31	pub overwrite: bool,
32	#[revision(start = 4)]
33	pub concurrently: bool,
34}
35
36impl DefineIndexStatement {
37	/// Process this type returning a computed simple Value
38	pub(crate) async fn compute(
39		&self,
40		stk: &mut Stk,
41		ctx: &Context,
42		opt: &Options,
43		doc: Option<&CursorDoc>,
44	) -> Result<Value, Error> {
45		// Allowed to run?
46		opt.is_allowed(Action::Edit, ResourceKind::Index, &Base::Db)?;
47		// Get the NS and DB
48		let (ns, db) = opt.ns_db()?;
49		// Fetch the transaction
50		let txn = ctx.tx();
51		// Check if the definition exists
52		if txn.get_tb_index(ns, db, &self.what, &self.name).await.is_ok() {
53			if self.if_not_exists {
54				return Ok(Value::None);
55			} else if !self.overwrite {
56				return Err(Error::IxAlreadyExists {
57					name: self.name.to_string(),
58				});
59			}
60			// Clear the index store cache
61			#[cfg(not(target_family = "wasm"))]
62			ctx.get_index_stores()
63				.index_removed(ctx.get_index_builder(), &txn, ns, db, &self.what, &self.name)
64				.await?;
65			#[cfg(target_family = "wasm")]
66			ctx.get_index_stores().index_removed(&txn, ns, db, &self.what, &self.name).await?;
67		}
68		// Does the table exists?
69		match txn.get_tb(ns, db, &self.what).await {
70			Ok(tb) => {
71				// Are we SchemaFull?
72				if tb.full {
73					// Check that the fields exists
74					for idiom in self.cols.iter() {
75						let Some(Part::Field(first)) = idiom.0.first() else {
76							continue;
77						};
78						txn.get_tb_field(ns, db, &self.what, &first.to_string()).await?;
79					}
80				}
81			}
82			// If the TB was not found, we're fine
83			Err(Error::TbNotFound {
84				..
85			}) => {}
86			// Any other error should be returned
87			Err(e) => return Err(e),
88		}
89		// Process the statement
90		let key = crate::key::table::ix::new(ns, db, &self.what, &self.name);
91		txn.get_or_add_ns(ns, opt.strict).await?;
92		txn.get_or_add_db(ns, db, opt.strict).await?;
93		txn.get_or_add_tb(ns, db, &self.what, opt.strict).await?;
94		txn.set(
95			key,
96			revision::to_vec(&DefineIndexStatement {
97				// Don't persist the `IF NOT EXISTS`, `OVERWRITE` and `CONCURRENTLY` clause to schema
98				if_not_exists: false,
99				overwrite: false,
100				concurrently: false,
101				..self.clone()
102			})?,
103			None,
104		)
105		.await?;
106		// Refresh the table cache
107		let key = crate::key::database::tb::new(ns, db, &self.what);
108		let tb = txn.get_tb(ns, db, &self.what).await?;
109		txn.set(
110			key,
111			revision::to_vec(&DefineTableStatement {
112				cache_indexes_ts: Uuid::now_v7(),
113				..tb.as_ref().clone()
114			})?,
115			None,
116		)
117		.await?;
118		// Clear the cache
119		if let Some(cache) = ctx.get_cache() {
120			cache.clear_tb(ns, db, &self.what);
121		}
122		// Clear the cache
123		txn.clear();
124		// Process the index
125		#[cfg(not(target_family = "wasm"))]
126		if self.concurrently {
127			self.async_index(ctx, opt)?;
128		} else {
129			self.sync_index(stk, ctx, opt, doc).await?;
130		}
131		#[cfg(target_family = "wasm")]
132		self.sync_index(stk, ctx, opt, doc).await?;
133		// Ok all good
134		Ok(Value::None)
135	}
136
137	async fn sync_index(
138		&self,
139		stk: &mut Stk,
140		ctx: &Context,
141		opt: &Options,
142		doc: Option<&CursorDoc>,
143	) -> Result<(), Error> {
144		// Force queries to run
145		let opt = &opt.new_with_force(Force::Index(Arc::new([self.clone()])));
146		// Update the index data
147		let stm = UpdateStatement {
148			what: Values(vec![Value::Table(self.what.clone().into())]),
149			output: Some(Output::None),
150			..UpdateStatement::default()
151		};
152		stm.compute(stk, ctx, opt, doc).await?;
153		Ok(())
154	}
155
156	#[cfg(not(target_family = "wasm"))]
157	fn async_index(&self, ctx: &Context, opt: &Options) -> Result<(), Error> {
158		ctx.get_index_builder().ok_or_else(|| fail!("No Index Builder"))?.build(
159			ctx,
160			opt.clone(),
161			self.clone().into(),
162		)
163	}
164}
165
166impl Display for DefineIndexStatement {
167	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
168		write!(f, "DEFINE INDEX")?;
169		if self.if_not_exists {
170			write!(f, " IF NOT EXISTS")?
171		}
172		if self.overwrite {
173			write!(f, " OVERWRITE")?
174		}
175		write!(f, " {} ON {} FIELDS {}", self.name, self.what, self.cols)?;
176		if Index::Idx != self.index {
177			write!(f, " {}", self.index)?;
178		}
179		if let Some(ref v) = self.comment {
180			write!(f, " COMMENT {v}")?
181		}
182		if self.concurrently {
183			write!(f, " CONCURRENTLY")?
184		}
185		Ok(())
186	}
187}
188
189impl InfoStructure for DefineIndexStatement {
190	fn structure(self) -> Value {
191		Value::from(map! {
192			"name".to_string() => self.name.structure(),
193			"what".to_string() => self.what.structure(),
194			"cols".to_string() => self.cols.structure(),
195			"index".to_string() => self.index.structure(),
196			"comment".to_string(), if let Some(v) = self.comment => v.into(),
197		})
198	}
199}