1use std::{collections::HashSet, fmt::Write, time::Duration};
2
3use eyre::{bail, eyre, Result};
4use indicatif::{ProgressBar, ProgressState, ProgressStyle};
5use rmp::decode::Bytes;
6
7use crate::{
8 database::{current_context, Database},
9 record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store},
10};
11use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordId, RecordIdx};
12
13use super::{History, HistoryId, HISTORY_TAG, HISTORY_VERSION};
14
15#[derive(Debug, Clone)]
16pub struct HistoryStore {
17 pub store: SqliteStore,
18 pub host_id: HostId,
19 pub encryption_key: [u8; 32],
20}
21
22#[derive(Debug, Eq, PartialEq, Clone)]
23pub enum HistoryRecord {
24 Create(History), Delete(HistoryId), }
27
28impl HistoryRecord {
29 pub fn serialize(&self) -> Result<DecryptedData> {
42 use rmp::encode;
45
46 let mut output = vec![];
47
48 match self {
49 HistoryRecord::Create(history) => {
50 encode::write_u8(&mut output, 0)?;
52
53 let bytes = history.serialize()?;
54
55 encode::write_bin(&mut output, &bytes.0)?;
56 }
57 HistoryRecord::Delete(id) => {
58 encode::write_u8(&mut output, 1)?;
60 encode::write_str(&mut output, id.0.as_str())?;
61 }
62 };
63
64 Ok(DecryptedData(output))
65 }
66
67 pub fn deserialize(bytes: &DecryptedData, version: &str) -> Result<Self> {
68 use rmp::decode;
69
70 fn error_report<E: std::fmt::Debug>(err: E) -> eyre::Report {
71 eyre!("{err:?}")
72 }
73
74 let mut bytes = Bytes::new(&bytes.0);
75
76 let record_type = decode::read_u8(&mut bytes).map_err(error_report)?;
77
78 match record_type {
79 0 => {
81 let _ = decode::read_bin_len(&mut bytes).map_err(error_report)?;
84
85 let record = History::deserialize(bytes.remaining_slice(), version)?;
86
87 Ok(HistoryRecord::Create(record))
88 }
89
90 1 => {
92 let bytes = bytes.remaining_slice();
93 let (id, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;
94
95 if !bytes.is_empty() {
96 bail!(
97 "trailing bytes decoding HistoryRecord::Delete - malformed? got {bytes:?}"
98 );
99 }
100
101 Ok(HistoryRecord::Delete(id.to_string().into()))
102 }
103
104 n => {
105 bail!("unknown HistoryRecord type {n}")
106 }
107 }
108 }
109}
110
111impl HistoryStore {
112 pub fn new(store: SqliteStore, host_id: HostId, encryption_key: [u8; 32]) -> Self {
113 HistoryStore {
114 store,
115 host_id,
116 encryption_key,
117 }
118 }
119
120 async fn push_record(&self, record: HistoryRecord) -> Result<(RecordId, RecordIdx)> {
121 let bytes = record.serialize()?;
122 let idx = self
123 .store
124 .last(self.host_id, HISTORY_TAG)
125 .await?
126 .map_or(0, |p| p.idx + 1);
127
128 let record = Record::builder()
129 .host(Host::new(self.host_id))
130 .version(HISTORY_VERSION.to_string())
131 .tag(HISTORY_TAG.to_string())
132 .idx(idx)
133 .data(bytes)
134 .build();
135
136 let id = record.id;
137
138 self.store
139 .push(&record.encrypt::<PASETO_V4>(&self.encryption_key))
140 .await?;
141
142 Ok((id, idx))
143 }
144
145 async fn push_batch(&self, records: impl Iterator<Item = HistoryRecord>) -> Result<()> {
146 let mut ret = Vec::new();
147
148 let idx = self
149 .store
150 .last(self.host_id, HISTORY_TAG)
151 .await?
152 .map_or(0, |p| p.idx + 1);
153
154 for (n, record) in records.enumerate() {
157 let bytes = record.serialize()?;
158
159 let record = Record::builder()
160 .host(Host::new(self.host_id))
161 .version(HISTORY_VERSION.to_string())
162 .tag(HISTORY_TAG.to_string())
163 .idx(idx + n as u64)
164 .data(bytes)
165 .build();
166
167 let record = record.encrypt::<PASETO_V4>(&self.encryption_key);
168
169 ret.push(record);
170 }
171
172 self.store.push_batch(ret.iter()).await?;
173
174 Ok(())
175 }
176
177 pub async fn delete(&self, id: HistoryId) -> Result<(RecordId, RecordIdx)> {
178 let record = HistoryRecord::Delete(id);
179
180 self.push_record(record).await
181 }
182
183 pub async fn push(&self, history: History) -> Result<(RecordId, RecordIdx)> {
184 let record = HistoryRecord::Create(history);
187
188 self.push_record(record).await
189 }
190
191 pub async fn history(&self) -> Result<Vec<HistoryRecord>> {
192 let records = self.store.all_tagged(HISTORY_TAG).await?;
195 let mut ret = Vec::with_capacity(records.len());
196
197 for record in records.into_iter() {
198 let hist = match record.version.as_str() {
199 HISTORY_VERSION => {
200 let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?;
201
202 HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)
203 }
204 version => bail!("unknown history version {version:?}"),
205 }?;
206
207 ret.push(hist);
208 }
209
210 Ok(ret)
211 }
212
213 pub async fn build(&self, database: &dyn Database) -> Result<()> {
214 let history = self.history().await?;
219
220 let mut creates = Vec::new();
226 let mut deletes = Vec::new();
227
228 for i in history {
229 match i {
230 HistoryRecord::Create(h) => {
231 creates.push(h);
232 }
233 HistoryRecord::Delete(id) => {
234 deletes.push(id);
235 }
236 }
237 }
238
239 database.save_bulk(&creates).await?;
240 database.delete_rows(&deletes).await?;
241
242 Ok(())
243 }
244
245 pub async fn incremental_build(&self, database: &dyn Database, ids: &[RecordId]) -> Result<()> {
246 for id in ids {
247 let record = self.store.get(*id).await;
248
249 let record = if let Ok(record) = record {
250 record
251 } else {
252 continue;
253 };
254
255 if record.tag != HISTORY_TAG {
256 continue;
257 }
258
259 let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?;
260 let record = HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)?;
261
262 match record {
263 HistoryRecord::Create(h) => {
264 database.save(&h).await?;
266 }
267 HistoryRecord::Delete(id) => {
268 database.delete_rows(&[id]).await?;
269 }
270 }
271 }
272
273 Ok(())
274 }
275
276 pub async fn history_ids(&self) -> Result<HashSet<HistoryId>> {
280 let history = self.history().await?;
281
282 let ret = HashSet::from_iter(history.iter().map(|h| match h {
283 HistoryRecord::Create(h) => h.id.clone(),
284 HistoryRecord::Delete(id) => id.clone(),
285 }));
286
287 Ok(ret)
288 }
289
290 pub async fn init_store(&self, db: &impl Database) -> Result<()> {
291 let pb = ProgressBar::new_spinner();
292 pb.set_style(
293 ProgressStyle::with_template("{spinner:.blue} {msg}")
294 .unwrap()
295 .with_key("eta", |state: &ProgressState, w: &mut dyn Write| {
296 write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()
297 })
298 .progress_chars("#>-"),
299 );
300 pb.enable_steady_tick(Duration::from_millis(500));
301
302 pb.set_message("Fetching history from old database");
303
304 let context = current_context();
305 let history = db.list(&[], &context, None, false, true).await?;
306
307 pb.set_message("Fetching history already in store");
308 let store_ids = self.history_ids().await?;
309
310 pb.set_message("Converting old history to new store");
311 let mut records = Vec::new();
312
313 for i in history {
314 debug!("loaded {}", i.id);
315
316 if store_ids.contains(&i.id) {
317 debug!("skipping {} - already exists", i.id);
318 continue;
319 }
320
321 if i.deleted_at.is_some() {
322 records.push(HistoryRecord::Delete(i.id));
323 } else {
324 records.push(HistoryRecord::Create(i));
325 }
326 }
327
328 pb.set_message("Writing to db");
329
330 if !records.is_empty() {
331 self.push_batch(records.into_iter()).await?;
332 }
333
334 pb.finish_with_message("Import complete");
335
336 Ok(())
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 use atuin_common::record::DecryptedData;
343 use time::macros::datetime;
344
345 use crate::history::{store::HistoryRecord, HISTORY_VERSION};
346
347 use super::History;
348
349 #[test]
350 fn test_serialize_deserialize_create() {
351 let bytes = [
352 204, 0, 196, 141, 205, 0, 0, 153, 217, 32, 48, 49, 56, 99, 100, 52, 102, 101, 56, 49,
353 55, 53, 55, 99, 100, 50, 97, 101, 101, 54, 53, 99, 100, 55, 56, 54, 49, 102, 57, 99,
354 56, 49, 207, 23, 166, 251, 212, 181, 82, 0, 0, 100, 0, 162, 108, 115, 217, 41, 47, 85,
355 115, 101, 114, 115, 47, 101, 108, 108, 105, 101, 47, 115, 114, 99, 47, 103, 105, 116,
356 104, 117, 98, 46, 99, 111, 109, 47, 97, 116, 117, 105, 110, 115, 104, 47, 97, 116, 117,
357 105, 110, 217, 32, 48, 49, 56, 99, 100, 52, 102, 101, 97, 100, 56, 57, 55, 53, 57, 55,
358 56, 53, 50, 53, 50, 55, 97, 51, 49, 99, 57, 57, 56, 48, 53, 57, 170, 98, 111, 111, 112,
359 58, 101, 108, 108, 105, 101, 192,
360 ];
361
362 let history = History {
363 id: "018cd4fe81757cd2aee65cd7861f9c81".to_owned().into(),
364 timestamp: datetime!(2024-01-04 00:00:00.000000 +00:00),
365 duration: 100,
366 exit: 0,
367 command: "ls".to_owned(),
368 cwd: "/Users/ellie/src/github.com/atuinsh/atuin".to_owned(),
369 session: "018cd4fead897597852527a31c998059".to_owned(),
370 hostname: "boop:ellie".to_owned(),
371 deleted_at: None,
372 };
373
374 let record = HistoryRecord::Create(history);
375
376 let serialized = record.serialize().expect("failed to serialize history");
377 assert_eq!(serialized.0, bytes);
378
379 let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION)
380 .expect("failed to deserialize HistoryRecord");
381 assert_eq!(deserialized, record);
382
383 let deserialized =
385 HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION)
386 .expect("failed to deserialize HistoryRecord");
387 assert_eq!(deserialized, record);
388 }
389
390 #[test]
391 fn test_serialize_deserialize_delete() {
392 let bytes = [
393 204, 1, 217, 32, 48, 49, 56, 99, 100, 52, 102, 101, 56, 49, 55, 53, 55, 99, 100, 50,
394 97, 101, 101, 54, 53, 99, 100, 55, 56, 54, 49, 102, 57, 99, 56, 49,
395 ];
396 let record = HistoryRecord::Delete("018cd4fe81757cd2aee65cd7861f9c81".to_string().into());
397
398 let serialized = record.serialize().expect("failed to serialize history");
399 assert_eq!(serialized.0, bytes);
400
401 let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION)
402 .expect("failed to deserialize HistoryRecord");
403 assert_eq!(deserialized, record);
404
405 let deserialized =
406 HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION)
407 .expect("failed to deserialize HistoryRecord");
408 assert_eq!(deserialized, record);
409 }
410}