1use super::KeyDecode as _;
2use super::Transaction;
3use crate::cnf::EXPORT_BATCH_SIZE;
4use crate::err::Error;
5use crate::key::thing;
6use crate::sql::paths::EDGE;
7use crate::sql::paths::IN;
8use crate::sql::paths::OUT;
9use crate::sql::statements::DefineTableStatement;
10use crate::sql::Value;
11use async_channel::Sender;
12use chrono::prelude::Utc;
13use chrono::TimeZone;
14
15#[derive(Clone, Debug)]
16pub struct Config {
17 pub users: bool,
18 pub accesses: bool,
19 pub params: bool,
20 pub functions: bool,
21 pub analyzers: bool,
22 pub tables: TableConfig,
23 pub versions: bool,
24 pub records: bool,
25}
26
27impl Default for Config {
28 fn default() -> Config {
29 Config {
30 users: true,
31 accesses: true,
32 params: true,
33 functions: true,
34 analyzers: true,
35 tables: TableConfig::default(),
36 versions: false,
37 records: true,
38 }
39 }
40}
41
42impl From<Config> for Value {
43 fn from(config: Config) -> Value {
44 let obj = map!(
45 "users" => config.users.into(),
46 "accesses" => config.accesses.into(),
47 "params" => config.params.into(),
48 "functions" => config.functions.into(),
49 "analyzers" => config.analyzers.into(),
50 "versions" => config.versions.into(),
51 "records" => config.records.into(),
52 "tables" => match config.tables {
53 TableConfig::All => true.into(),
54 TableConfig::None => false.into(),
55 TableConfig::Some(v) => v.into()
56 }
57 );
58
59 obj.into()
60 }
61}
62
63impl TryFrom<&Value> for Config {
64 type Error = Error;
65 fn try_from(value: &Value) -> Result<Self, Self::Error> {
66 match value {
67 Value::Object(obj) => {
68 let mut config = Config::default();
69
70 macro_rules! bool_prop {
71 ($prop:ident) => {{
72 match obj.get(stringify!($prop)) {
73 Some(Value::Bool(v)) => {
74 config.$prop = v.to_owned();
75 }
76 Some(v) => {
77 return Err(Error::InvalidExportConfig(
78 v.to_owned(),
79 "a bool".into(),
80 ))
81 }
82 _ => (),
83 }
84 }};
85 }
86
87 bool_prop!(users);
88 bool_prop!(accesses);
89 bool_prop!(params);
90 bool_prop!(functions);
91 bool_prop!(analyzers);
92 bool_prop!(versions);
93 bool_prop!(records);
94
95 if let Some(v) = obj.get("tables") {
96 config.tables = v.try_into()?;
97 }
98
99 Ok(config)
100 }
101 v => Err(Error::InvalidExportConfig(v.to_owned(), "an object".into())),
102 }
103 }
104}
105
106#[derive(Clone, Debug, Default)]
107pub enum TableConfig {
108 #[default]
109 All,
110 None,
111 Some(Vec<String>),
112}
113
114impl From<bool> for TableConfig {
115 fn from(value: bool) -> Self {
116 match value {
117 true => TableConfig::All,
118 false => TableConfig::None,
119 }
120 }
121}
122
123impl From<Vec<String>> for TableConfig {
124 fn from(value: Vec<String>) -> Self {
125 TableConfig::Some(value)
126 }
127}
128
129impl From<Vec<&str>> for TableConfig {
130 fn from(value: Vec<&str>) -> Self {
131 TableConfig::Some(value.into_iter().map(ToOwned::to_owned).collect())
132 }
133}
134
135impl TryFrom<&Value> for TableConfig {
136 type Error = Error;
137 fn try_from(value: &Value) -> Result<Self, Self::Error> {
138 match value {
139 Value::Bool(b) => match b {
140 true => Ok(TableConfig::All),
141 false => Ok(TableConfig::None),
142 },
143 Value::None | Value::Null => Ok(TableConfig::None),
144 Value::Array(v) => v
145 .iter()
146 .cloned()
147 .map(|v| match v {
148 Value::Strand(str) => Ok(str.0),
149 v => Err(Error::InvalidExportConfig(v.to_owned(), "a string".into())),
150 })
151 .collect::<Result<Vec<String>, Error>>()
152 .map(TableConfig::Some),
153 v => Err(Error::InvalidExportConfig(
154 v.to_owned(),
155 "a bool, none, null or array<string>".into(),
156 )),
157 }
158 }
159}
160
161impl TableConfig {
162 pub(crate) fn is_any(&self) -> bool {
164 matches!(self, Self::All | Self::Some(_))
165 }
166 pub(crate) fn includes(&self, table: &str) -> bool {
168 match self {
169 Self::All => true,
170 Self::None => false,
171 Self::Some(v) => v.iter().any(|v| v.eq(table)),
172 }
173 }
174}
175
176impl Transaction {
177 pub async fn export(
179 &self,
180 ns: &str,
181 db: &str,
182 cfg: Config,
183 chn: Sender<Vec<u8>>,
184 ) -> Result<(), Error> {
185 self.export_metadata(&cfg, &chn, ns, db).await?;
187 self.export_tables(ns, db, &cfg, &chn).await?;
189 Ok(())
190 }
191
192 async fn export_metadata(
193 &self,
194 cfg: &Config,
195 chn: &Sender<Vec<u8>>,
196 ns: &str,
197 db: &str,
198 ) -> Result<(), Error> {
199 self.export_section("OPTION", vec!["OPTION IMPORT"], chn).await?;
201
202 if cfg.users {
204 let users = self.all_db_users(ns, db).await?;
205 self.export_section("USERS", users.to_vec(), chn).await?;
206 }
207
208 if cfg.accesses {
210 let accesses = self.all_db_accesses(ns, db).await?;
211 self.export_section("ACCESSES", accesses.to_vec(), chn).await?;
212 }
213
214 if cfg.params {
216 let params = self.all_db_params(ns, db).await?;
217 self.export_section("PARAMS", params.to_vec(), chn).await?;
218 }
219
220 if cfg.functions {
222 let functions = self.all_db_functions(ns, db).await?;
223 self.export_section("FUNCTIONS", functions.to_vec(), chn).await?;
224 }
225
226 if cfg.analyzers {
228 let analyzers = self.all_db_analyzers(ns, db).await?;
229 self.export_section("ANALYZERS", analyzers.to_vec(), chn).await?;
230 }
231
232 Ok(())
233 }
234
235 async fn export_section<T: ToString>(
236 &self,
237 title: &str,
238 items: Vec<T>,
239 chn: &Sender<Vec<u8>>,
240 ) -> Result<(), Error> {
241 if items.is_empty() {
242 return Ok(());
243 }
244
245 chn.send(bytes!("-- ------------------------------")).await?;
246 chn.send(bytes!(format!("-- {}", title))).await?;
247 chn.send(bytes!("-- ------------------------------")).await?;
248 chn.send(bytes!("")).await?;
249
250 for item in items {
251 chn.send(bytes!(format!("{};", item.to_string()))).await?;
252 }
253
254 chn.send(bytes!("")).await?;
255 Ok(())
256 }
257
258 async fn export_tables(
259 &self,
260 ns: &str,
261 db: &str,
262 cfg: &Config,
263 chn: &Sender<Vec<u8>>,
264 ) -> Result<(), Error> {
265 if !cfg.tables.is_any() {
267 return Ok(());
268 }
269 let tables = self.all_tb(ns, db, None).await?;
271 for table in tables.iter() {
273 if !cfg.tables.includes(&table.name) {
275 continue;
276 }
277 self.export_table_structure(ns, db, table, chn).await?;
279 if cfg.records {
281 self.export_table_data(ns, db, table, cfg, chn).await?;
282 }
283 }
284
285 Ok(())
286 }
287
288 async fn export_table_structure(
289 &self,
290 ns: &str,
291 db: &str,
292 table: &DefineTableStatement,
293 chn: &Sender<Vec<u8>>,
294 ) -> Result<(), Error> {
295 chn.send(bytes!("-- ------------------------------")).await?;
296 chn.send(bytes!(format!("-- TABLE: {}", table.name))).await?;
297 chn.send(bytes!("-- ------------------------------")).await?;
298 chn.send(bytes!("")).await?;
299 chn.send(bytes!(format!("{};", table))).await?;
300 chn.send(bytes!("")).await?;
301 let fields = self.all_tb_fields(ns, db, &table.name, None).await?;
303 for field in fields.iter() {
304 chn.send(bytes!(format!("{};", field))).await?;
305 }
306 chn.send(bytes!("")).await?;
307 let indexes = self.all_tb_indexes(ns, db, &table.name).await?;
309 for index in indexes.iter() {
310 chn.send(bytes!(format!("{};", index))).await?;
311 }
312 chn.send(bytes!("")).await?;
313 let events = self.all_tb_events(ns, db, &table.name).await?;
315 for event in events.iter() {
316 chn.send(bytes!(format!("{};", event))).await?;
317 }
318 chn.send(bytes!("")).await?;
319 Ok(())
321 }
322
323 async fn export_table_data(
324 &self,
325 ns: &str,
326 db: &str,
327 table: &DefineTableStatement,
328 cfg: &Config,
329 chn: &Sender<Vec<u8>>,
330 ) -> Result<(), Error> {
331 chn.send(bytes!("-- ------------------------------")).await?;
332 chn.send(bytes!(format!("-- TABLE DATA: {}", table.name))).await?;
333 chn.send(bytes!("-- ------------------------------")).await?;
334 chn.send(bytes!("")).await?;
335
336 let beg = crate::key::thing::prefix(ns, db, &table.name)?;
337 let end = crate::key::thing::suffix(ns, db, &table.name)?;
338 let mut next = Some(beg..end);
339
340 while let Some(rng) = next {
341 if cfg.versions {
342 let batch = self.batch_keys_vals_versions(rng, *EXPORT_BATCH_SIZE).await?;
343 next = batch.next;
344 if batch.result.is_empty() {
346 break;
347 }
348 self.export_versioned_data(batch.result, chn).await?;
349 } else {
350 let batch = self.batch_keys_vals(rng, *EXPORT_BATCH_SIZE, None).await?;
351 next = batch.next;
352 if batch.result.is_empty() {
354 break;
355 }
356 self.export_regular_data(batch.result, chn).await?;
357 }
358 continue;
360 }
361
362 chn.send(bytes!("")).await?;
363 Ok(())
364 }
365
366 fn process_value(
383 k: thing::Thing,
384 v: Value,
385 records_relate: &mut Vec<String>,
386 records_normal: &mut Vec<String>,
387 is_tombstone: Option<bool>,
388 version: Option<u64>,
389 ) -> String {
390 match (v.pick(&*EDGE), v.pick(&*IN), v.pick(&*OUT)) {
392 (Value::Bool(true), Value::Thing(_), Value::Thing(_)) => {
394 if let Some(version) = version {
395 let ts = Utc.timestamp_nanos(version as i64);
397 let sql = format!("INSERT RELATION {} VERSION d'{:?}';", v, ts);
398 records_relate.push(sql);
399 String::new()
400 } else {
401 records_relate.push(v.to_string());
403 String::new()
404 }
405 }
406 _ => {
408 if let Some(is_tombstone) = is_tombstone {
409 if is_tombstone {
410 format!("DELETE {}:{};", k.tb, k.id)
412 } else {
413 let ts = Utc.timestamp_nanos(version.unwrap() as i64);
415 format!("INSERT {} VERSION d'{:?}';", v, ts)
416 }
417 } else {
418 records_normal.push(v.to_string());
420 String::new()
421 }
422 }
423 }
424 }
425
426 async fn export_versioned_data(
442 &self,
443 versioned_values: Vec<(Vec<u8>, Vec<u8>, u64, bool)>,
444 chn: &Sender<Vec<u8>>,
445 ) -> Result<(), Error> {
446 let mut records_relate = Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
448
449 let mut counter = 0;
451
452 for (k, v, version, is_tombstone) in versioned_values {
454 if counter % *EXPORT_BATCH_SIZE == 0 {
456 chn.send(bytes!("BEGIN;")).await?;
457 }
458
459 let k = thing::Thing::decode(&k)?;
460 let v: Value = if v.is_empty() {
461 Value::None
462 } else {
463 revision::from_slice(&v)?
464 };
465 let sql = Self::process_value(
467 k,
468 v,
469 &mut records_relate,
470 &mut Vec::new(),
471 Some(is_tombstone),
472 Some(version),
473 );
474 if !sql.is_empty() {
476 chn.send(bytes!(sql)).await?;
477 }
478
479 counter += 1;
481
482 if counter % *EXPORT_BATCH_SIZE == 0 {
484 chn.send(bytes!("COMMIT;")).await?;
485 }
486 }
487
488 if counter % *EXPORT_BATCH_SIZE != 0 {
490 chn.send(bytes!("COMMIT;")).await?;
491 }
492
493 if records_relate.is_empty() {
495 return Ok(());
496 }
497
498 chn.send(bytes!("BEGIN;")).await?;
500
501 if !records_relate.is_empty() {
503 for record in records_relate.iter() {
504 chn.send(bytes!(record)).await?;
505 }
506 }
507
508 chn.send(bytes!("COMMIT;")).await?;
510
511 Ok(())
512 }
513
514 async fn export_regular_data(
530 &self,
531 regular_values: Vec<(Vec<u8>, Vec<u8>)>,
532 chn: &Sender<Vec<u8>>,
533 ) -> Result<(), Error> {
534 let mut records_normal = Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
536 let mut records_relate = Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
537
538 for (k, v) in regular_values {
540 let k = thing::Thing::decode(&k)?;
541 let v: Value = revision::from_slice(&v)?;
542 Self::process_value(k, v, &mut records_relate, &mut records_normal, None, None);
544 }
545
546 if !records_normal.is_empty() {
548 let values = records_normal.join(", ");
549 let sql = format!("INSERT [ {} ];", values);
550 chn.send(bytes!(sql)).await?;
551 }
552
553 if !records_relate.is_empty() {
555 let values = records_relate.join(", ");
556 let sql = format!("INSERT RELATION [ {} ];", values);
557 chn.send(bytes!(sql)).await?;
558 }
559
560 Ok(())
561 }
562}