1use crate::cnf::NORMAL_FETCH_SIZE;
2use crate::dbs::node::Node;
3use crate::err::Error;
4use crate::kvs::Datastore;
5use crate::kvs::KeyDecode as _;
6use crate::kvs::Live;
7use crate::kvs::LockType::*;
8use crate::kvs::TransactionType::*;
9use crate::sql::statements::LiveStatement;
10use std::time::Duration;
11
12const TARGET: &str = "surrealdb::core::kvs::node";
13
14impl Datastore {
15 #[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
23 pub async fn insert_node(&self, id: uuid::Uuid) -> Result<(), Error> {
24 trace!(target: TARGET, "Inserting node in the cluster");
26 crate::sys::refresh().await;
28 let txn = self.transaction(Write, Optimistic).await?;
30 let key = crate::key::root::nd::Nd::new(id);
31 let now = self.clock_now().await;
32 let val = revision::to_vec(&Node::new(id, now, false))?;
33 match run!(txn, txn.put(key, val, None).await) {
34 Err(Error::TxKeyAlreadyExists) => Err(Error::ClAlreadyExists {
35 id: id.to_string(),
36 }),
37 other => other,
38 }
39 }
40
41 #[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
49 pub async fn update_node(&self, id: uuid::Uuid) -> Result<(), Error> {
50 trace!(target: TARGET, "Updating node in the cluster");
52 crate::sys::refresh().await;
54 let txn = self.transaction(Write, Optimistic).await?;
56 let key = crate::key::root::nd::new(id);
57 let now = self.clock_now().await;
58 let val = Node::new(id, now, false);
59 run!(txn, txn.replace(key, revision::to_vec(&val)?).await)
60 }
61
62 #[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
70 pub async fn delete_node(&self, id: uuid::Uuid) -> Result<(), Error> {
71 trace!(target: TARGET, "Archiving node in the cluster");
73 let txn = self.transaction(Write, Optimistic).await?;
75 let key = crate::key::root::nd::new(id);
76 let val = catch!(txn, txn.get_node(id).await);
77 let val = val.as_ref().archive();
78 run!(txn, txn.replace(key, revision::to_vec(&val)?).await)
79 }
80
81 #[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
89 pub async fn expire_nodes(&self) -> Result<(), Error> {
90 trace!(target: TARGET, "Archiving expired nodes in the cluster");
92 let inactive = {
94 let txn = self.transaction(Read, Optimistic).await?;
95 let nds = catch!(txn, txn.all_nodes().await);
96 let now = self.clock_now().await;
97 catch!(txn, txn.cancel().await);
98 nds.iter()
100 .filter_map(|n| {
101 match n.is_active() && n.hb < now - Duration::from_secs(30) {
103 true => Some(n.to_owned()),
104 false => None,
105 }
106 })
107 .collect::<Vec<_>>()
108 };
109 if !inactive.is_empty() {
111 let txn = self.transaction(Write, Optimistic).await?;
113 for nd in inactive.iter() {
115 trace!(target: TARGET, id = %nd.id, "Archiving node in the cluster");
117 let val = nd.archive();
119 let key = crate::key::root::nd::new(nd.id);
121 catch!(txn, txn.replace(key, revision::to_vec(&val)?).await);
123 }
124 catch!(txn, txn.commit().await);
126 }
127 Ok(())
129 }
130
131 #[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
139 pub async fn remove_nodes(&self) -> Result<(), Error> {
140 trace!(target: TARGET, "Cleaning up archived nodes in the cluster");
142 let archived = {
144 let txn = self.transaction(Read, Optimistic).await?;
145 let nds = catch!(txn, txn.all_nodes().await);
146 catch!(txn, txn.cancel().await);
147 nds.iter().filter_map(Node::archived).collect::<Vec<_>>()
149 };
150 for id in archived.iter() {
152 let beg = crate::key::node::lq::prefix(*id)?;
154 let end = crate::key::node::lq::suffix(*id)?;
155 let mut next = Some(beg..end);
156 let txn = self.transaction(Write, Optimistic).await?;
157 {
158 trace!(target: TARGET, id = %id, "Deleting live queries for node");
160 while let Some(rng) = next {
162 yield_now!();
164 let max = *NORMAL_FETCH_SIZE;
166 let res = catch!(txn, txn.batch_keys_vals(rng, max, None).await);
167 next = res.next;
168 for (k, v) in res.result.iter() {
169 let val: Live = revision::from_slice(v)?;
171 let nlq = catch!(txn, crate::key::node::lq::Lq::decode(k));
173 if archived.contains(&nlq.nd) {
175 let tlq = crate::key::table::lq::new(&val.ns, &val.db, &val.tb, nlq.lq);
177 catch!(txn, txn.clr(tlq).await);
179 catch!(txn, txn.clr(nlq).await);
181 }
182 }
183 }
184 }
185 {
186 trace!(target: TARGET, id = %id, "Deleting node from the cluster");
188 let key = crate::key::root::nd::new(*id);
190 catch!(txn, txn.clr(key).await);
192 }
193 catch!(txn, txn.commit().await);
195 }
196 Ok(())
198 }
199
200 #[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
211 pub async fn garbage_collect(&self) -> Result<(), Error> {
212 trace!(target: TARGET, "Garbage collecting all miscellaneous data");
214 let archived = {
216 let txn = self.transaction(Read, Optimistic).await?;
217 let nds = catch!(txn, txn.all_nodes().await);
218 nds.iter().filter_map(Node::archived).collect::<Vec<_>>()
220 };
221 let nss = {
223 let txn = self.transaction(Read, Optimistic).await?;
224 catch!(txn, txn.all_ns().await)
225 };
226 for ns in nss.iter() {
228 trace!(target: TARGET, "Garbage collecting data in namespace {}", ns.name);
230 let dbs = {
232 let txn = self.transaction(Read, Optimistic).await?;
233 catch!(txn, txn.all_db(&ns.name).await)
234 };
235 for db in dbs.iter() {
237 trace!(target: TARGET, "Garbage collecting data in database {}/{}", ns.name, db.name);
239 let tbs = {
241 let txn = self.transaction(Read, Optimistic).await?;
242 catch!(txn, txn.all_tb(&ns.name, &db.name, None).await)
243 };
244 for tb in tbs.iter() {
246 trace!(target: TARGET, "Garbage collecting data in table {}/{}/{}", ns.name, db.name, tb.name);
248 let beg = crate::key::table::lq::prefix(&ns.name, &db.name, &tb.name)?;
250 let end = crate::key::table::lq::suffix(&ns.name, &db.name, &tb.name)?;
251 let mut next = Some(beg..end);
252 let txn = self.transaction(Write, Optimistic).await?;
253 while let Some(rng) = next {
254 yield_now!();
256 let max = *NORMAL_FETCH_SIZE;
258 let res = catch!(txn, txn.batch_keys_vals(rng, max, None).await);
259 next = res.next;
260 for (k, v) in res.result.iter() {
261 let stm: LiveStatement = revision::from_slice(v)?;
263 let (nid, lid) = (stm.node.0, stm.id.0);
265 if archived.contains(&stm.node) {
267 let tlq = catch!(txn, crate::key::table::lq::Lq::decode(k));
269 let nlq = crate::key::node::lq::new(nid, lid);
271 catch!(txn, txn.clr(nlq).await);
273 catch!(txn, txn.clr(tlq).await);
275 }
276 }
277 }
278 txn.commit().await?;
280 }
281 }
282 }
283 Ok(())
285 }
286
287 #[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
296 pub async fn delete_queries(&self, ids: Vec<uuid::Uuid>) -> Result<(), Error> {
297 trace!(target: TARGET, "Deleting live queries for a connection");
299 let txn = self.transaction(Write, Optimistic).await?;
301 for id in ids.into_iter() {
303 let nlq = crate::key::node::lq::new(self.id(), id);
305 if let Some(val) = catch!(txn, txn.get(nlq, None).await) {
307 let lq: Live = revision::from_slice(&val)?;
309 let nlq = crate::key::node::lq::new(self.id(), id);
311 let tlq = crate::key::table::lq::new(&lq.ns, &lq.db, &lq.tb, id);
313 catch!(txn, txn.clr(tlq).await);
315 catch!(txn, txn.clr(nlq).await);
317 }
318 }
319 txn.commit().await?;
321 Ok(())
323 }
324}