surrealdb_core/kvs/
node.rs

1use crate::cnf::NORMAL_FETCH_SIZE;
2use crate::dbs::node::Node;
3use crate::err::Error;
4use crate::kvs::Datastore;
5use crate::kvs::KeyDecode as _;
6use crate::kvs::Live;
7use crate::kvs::LockType::*;
8use crate::kvs::TransactionType::*;
9use crate::sql::statements::LiveStatement;
10use std::time::Duration;
11
12const TARGET: &str = "surrealdb::core::kvs::node";
13
14impl Datastore {
15	/// Inserts a node for the first time into the cluster.
16	///
17	/// This function should be run at server or database startup.
18	///
19	/// This function ensures that this node is entered into the clister
20	/// membership entries. This function must be run at server or database
21	/// startup, in order to write the initial entry and timestamp to storage.
22	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
23	pub async fn insert_node(&self, id: uuid::Uuid) -> Result<(), Error> {
24		// Log when this method is run
25		trace!(target: TARGET, "Inserting node in the cluster");
26		// Refresh system usage metrics
27		crate::sys::refresh().await;
28		// Open transaction and set node data
29		let txn = self.transaction(Write, Optimistic).await?;
30		let key = crate::key::root::nd::Nd::new(id);
31		let now = self.clock_now().await;
32		let val = revision::to_vec(&Node::new(id, now, false))?;
33		match run!(txn, txn.put(key, val, None).await) {
34			Err(Error::TxKeyAlreadyExists) => Err(Error::ClAlreadyExists {
35				id: id.to_string(),
36			}),
37			other => other,
38		}
39	}
40
41	/// Updates an already existing node in the cluster.
42	///
43	/// This function should be run periodically at a regular interval.
44	///
45	/// This function updates the entry for this node with an up-to-date
46	/// timestamp. This ensures that the node is not marked as expired by any
47	/// garbage collection tasks, preventing any data cleanup for this node.
48	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
49	pub async fn update_node(&self, id: uuid::Uuid) -> Result<(), Error> {
50		// Log when this method is run
51		trace!(target: TARGET, "Updating node in the cluster");
52		// Refresh system usage metrics
53		crate::sys::refresh().await;
54		// Open transaction and set node data
55		let txn = self.transaction(Write, Optimistic).await?;
56		let key = crate::key::root::nd::new(id);
57		let now = self.clock_now().await;
58		let val = Node::new(id, now, false);
59		run!(txn, txn.replace(key, revision::to_vec(&val)?).await)
60	}
61
62	/// Deletes a node from the cluster.
63	///
64	/// This function should be run when a node is shutting down.
65	///
66	/// This function marks the node as archived, ready for garbage collection.
67	/// Later on when garbage collection is running the live queries assigned
68	/// to this node will be removed, along with the node itself.
69	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
70	pub async fn delete_node(&self, id: uuid::Uuid) -> Result<(), Error> {
71		// Log when this method is run
72		trace!(target: TARGET, "Archiving node in the cluster");
73		// Open transaction and set node data
74		let txn = self.transaction(Write, Optimistic).await?;
75		let key = crate::key::root::nd::new(id);
76		let val = catch!(txn, txn.get_node(id).await);
77		let val = val.as_ref().archive();
78		run!(txn, txn.replace(key, revision::to_vec(&val)?).await)
79	}
80
81	/// Expires nodes which have timedout from the cluster.
82	///
83	/// This function should be run periodically at an interval.
84	///
85	/// This function marks the node as archived, ready for garbage collection.
86	/// Later on when garbage collection is running the live queries assigned
87	/// to this node will be removed, along with the node itself.
88	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
89	pub async fn expire_nodes(&self) -> Result<(), Error> {
90		// Log when this method is run
91		trace!(target: TARGET, "Archiving expired nodes in the cluster");
92		// Fetch all of the inactive nodes
93		let inactive = {
94			let txn = self.transaction(Read, Optimistic).await?;
95			let nds = catch!(txn, txn.all_nodes().await);
96			let now = self.clock_now().await;
97			catch!(txn, txn.cancel().await);
98			// Filter the inactive nodes
99			nds.iter()
100				.filter_map(|n| {
101					// Check that the node is active and has expired
102					match n.is_active() && n.hb < now - Duration::from_secs(30) {
103						true => Some(n.to_owned()),
104						false => None,
105					}
106				})
107				.collect::<Vec<_>>()
108		};
109		// Check if there are inactive nodes
110		if !inactive.is_empty() {
111			// Open a writeable transaction
112			let txn = self.transaction(Write, Optimistic).await?;
113			// Archive the inactive nodes
114			for nd in inactive.iter() {
115				// Log the live query scanning
116				trace!(target: TARGET, id = %nd.id, "Archiving node in the cluster");
117				// Mark the node as archived
118				let val = nd.archive();
119				// Get the key for the node entry
120				let key = crate::key::root::nd::new(nd.id);
121				// Update the node entry
122				catch!(txn, txn.replace(key, revision::to_vec(&val)?).await);
123			}
124			// Commit the changes
125			catch!(txn, txn.commit().await);
126		}
127		// Everything was successful
128		Ok(())
129	}
130
131	/// Removes and cleans up nodes which are no longer in this cluster.
132	///
133	/// This function should be run periodically at an interval.
134	///
135	/// This function clears up all nodes which have been marked as archived.
136	/// When a matching node is found, all node queries, and table queries are
137	/// garbage collected, before the node itself is completely deleted.
138	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
139	pub async fn remove_nodes(&self) -> Result<(), Error> {
140		// Log when this method is run
141		trace!(target: TARGET, "Cleaning up archived nodes in the cluster");
142		// Fetch all of the archived nodes
143		let archived = {
144			let txn = self.transaction(Read, Optimistic).await?;
145			let nds = catch!(txn, txn.all_nodes().await);
146			catch!(txn, txn.cancel().await);
147			// Filter the archived nodes
148			nds.iter().filter_map(Node::archived).collect::<Vec<_>>()
149		};
150		// Loop over the archived nodes
151		for id in archived.iter() {
152			// Open a writeable transaction
153			let beg = crate::key::node::lq::prefix(*id)?;
154			let end = crate::key::node::lq::suffix(*id)?;
155			let mut next = Some(beg..end);
156			let txn = self.transaction(Write, Optimistic).await?;
157			{
158				// Log the live query scanning
159				trace!(target: TARGET, id = %id, "Deleting live queries for node");
160				// Scan the live queries for this node
161				while let Some(rng) = next {
162					// Pause and yield execution
163					yield_now!();
164					// Fetch the next batch of keys and values
165					let max = *NORMAL_FETCH_SIZE;
166					let res = catch!(txn, txn.batch_keys_vals(rng, max, None).await);
167					next = res.next;
168					for (k, v) in res.result.iter() {
169						// Decode the data for this live query
170						let val: Live = revision::from_slice(v)?;
171						// Get the key for this node live query
172						let nlq = catch!(txn, crate::key::node::lq::Lq::decode(k));
173						// Check that the node for this query is archived
174						if archived.contains(&nlq.nd) {
175							// Get the key for this table live query
176							let tlq = crate::key::table::lq::new(&val.ns, &val.db, &val.tb, nlq.lq);
177							// Delete the table live query
178							catch!(txn, txn.clr(tlq).await);
179							// Delete the node live query
180							catch!(txn, txn.clr(nlq).await);
181						}
182					}
183				}
184			}
185			{
186				// Log the node deletion
187				trace!(target: TARGET, id = %id, "Deleting node from the cluster");
188				// Get the key for the node entry
189				let key = crate::key::root::nd::new(*id);
190				// Delete the cluster node entry
191				catch!(txn, txn.clr(key).await);
192			}
193			// Commit the changes
194			catch!(txn, txn.commit().await);
195		}
196		// Everything was successful
197		Ok(())
198	}
199
200	/// Clean up all other miscellaneous data.
201	///
202	/// This function should be run periodically at an interval.
203	///
204	/// This function clears up all data which might have been missed from
205	/// previous cleanup runs, or when previous runs failed. This function
206	/// currently deletes all live queries, for nodes which no longer exist
207	/// in the cluster, from all namespaces, databases, and tables. It uses
208	/// a number of transactions in order to prevent failure of large or
209	/// long-running transactions on distributed storage engines.
210	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
211	pub async fn garbage_collect(&self) -> Result<(), Error> {
212		// Log the node deletion
213		trace!(target: TARGET, "Garbage collecting all miscellaneous data");
214		// Fetch archived nodes
215		let archived = {
216			let txn = self.transaction(Read, Optimistic).await?;
217			let nds = catch!(txn, txn.all_nodes().await);
218			// Filter the archived nodes
219			nds.iter().filter_map(Node::archived).collect::<Vec<_>>()
220		};
221		// Fetch all namespaces
222		let nss = {
223			let txn = self.transaction(Read, Optimistic).await?;
224			catch!(txn, txn.all_ns().await)
225		};
226		// Loop over all namespaces
227		for ns in nss.iter() {
228			// Log the namespace
229			trace!(target: TARGET, "Garbage collecting data in namespace {}", ns.name);
230			// Fetch all databases
231			let dbs = {
232				let txn = self.transaction(Read, Optimistic).await?;
233				catch!(txn, txn.all_db(&ns.name).await)
234			};
235			// Loop over all databases
236			for db in dbs.iter() {
237				// Log the namespace
238				trace!(target: TARGET, "Garbage collecting data in database {}/{}", ns.name, db.name);
239				// Fetch all tables
240				let tbs = {
241					let txn = self.transaction(Read, Optimistic).await?;
242					catch!(txn, txn.all_tb(&ns.name, &db.name, None).await)
243				};
244				// Loop over all tables
245				for tb in tbs.iter() {
246					// Log the namespace
247					trace!(target: TARGET, "Garbage collecting data in table {}/{}/{}", ns.name, db.name, tb.name);
248					// Iterate over the table live queries
249					let beg = crate::key::table::lq::prefix(&ns.name, &db.name, &tb.name)?;
250					let end = crate::key::table::lq::suffix(&ns.name, &db.name, &tb.name)?;
251					let mut next = Some(beg..end);
252					let txn = self.transaction(Write, Optimistic).await?;
253					while let Some(rng) = next {
254						// Pause and yield execution
255						yield_now!();
256						// Fetch the next batch of keys and values
257						let max = *NORMAL_FETCH_SIZE;
258						let res = catch!(txn, txn.batch_keys_vals(rng, max, None).await);
259						next = res.next;
260						for (k, v) in res.result.iter() {
261							// Decode the LIVE query statement
262							let stm: LiveStatement = revision::from_slice(v)?;
263							// Get the node id and the live query id
264							let (nid, lid) = (stm.node.0, stm.id.0);
265							// Check that the node for this query is archived
266							if archived.contains(&stm.node) {
267								// Get the key for this node live query
268								let tlq = catch!(txn, crate::key::table::lq::Lq::decode(k));
269								// Get the key for this table live query
270								let nlq = crate::key::node::lq::new(nid, lid);
271								// Delete the node live query
272								catch!(txn, txn.clr(nlq).await);
273								// Delete the table live query
274								catch!(txn, txn.clr(tlq).await);
275							}
276						}
277					}
278					// Commit the changes
279					txn.commit().await?;
280				}
281			}
282		}
283		// All ok
284		Ok(())
285	}
286
287	/// Clean up the live queries for a disconnected connection.
288	///
289	/// This function should be run when a WebSocket disconnects.
290	///
291	/// This function clears up the live queries on the current node, which
292	/// are specified by uique live query UUIDs. This is necessary when a
293	/// WebSocket disconnects, and any associated live queries need to be
294	/// cleaned up and removed.
295	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
296	pub async fn delete_queries(&self, ids: Vec<uuid::Uuid>) -> Result<(), Error> {
297		// Log the node deletion
298		trace!(target: TARGET, "Deleting live queries for a connection");
299		// Fetch expired nodes
300		let txn = self.transaction(Write, Optimistic).await?;
301		// Loop over the live query unique ids
302		for id in ids.into_iter() {
303			// Get the key for this node live query
304			let nlq = crate::key::node::lq::new(self.id(), id);
305			// Fetch the LIVE meta data node entry
306			if let Some(val) = catch!(txn, txn.get(nlq, None).await) {
307				// Decode the data for this live query
308				let lq: Live = revision::from_slice(&val)?;
309				// Get the key for this node live query
310				let nlq = crate::key::node::lq::new(self.id(), id);
311				// Get the key for this table live query
312				let tlq = crate::key::table::lq::new(&lq.ns, &lq.db, &lq.tb, id);
313				// Delete the table live query
314				catch!(txn, txn.clr(tlq).await);
315				// Delete the node live query
316				catch!(txn, txn.clr(nlq).await);
317			}
318		}
319		// Commit the changes
320		txn.commit().await?;
321		// All ok
322		Ok(())
323	}
324}