atuin_client/history/
store.rs

1use std::{collections::HashSet, fmt::Write, time::Duration};
2
3use eyre::{bail, eyre, Result};
4use indicatif::{ProgressBar, ProgressState, ProgressStyle};
5use rmp::decode::Bytes;
6
7use crate::{
8    database::{current_context, Database},
9    record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store},
10};
11use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordId, RecordIdx};
12
13use super::{History, HistoryId, HISTORY_TAG, HISTORY_VERSION};
14
15#[derive(Debug, Clone)]
16pub struct HistoryStore {
17    pub store: SqliteStore,
18    pub host_id: HostId,
19    pub encryption_key: [u8; 32],
20}
21
22#[derive(Debug, Eq, PartialEq, Clone)]
23pub enum HistoryRecord {
24    Create(History),   // Create a history record
25    Delete(HistoryId), // Delete a history record, identified by ID
26}
27
28impl HistoryRecord {
29    /// Serialize a history record, returning DecryptedData
30    /// The record will be of a certain type
31    /// We map those like so:
32    ///
33    /// HistoryRecord::Create -> 0
34    /// HistoryRecord::Delete-> 1
35    ///
36    /// This numeric identifier is then written as the first byte to the buffer. For history, we
37    /// append the serialized history right afterwards, to avoid having to handle serialization
38    /// twice.
39    ///
40    /// Deletion simply refers to the history by ID
41    pub fn serialize(&self) -> Result<DecryptedData> {
42        // probably don't actually need to use rmp here, but if we ever need to extend it, it's a
43        // nice wrapper around raw byte stuff
44        use rmp::encode;
45
46        let mut output = vec![];
47
48        match self {
49            HistoryRecord::Create(history) => {
50                // 0 -> a history create
51                encode::write_u8(&mut output, 0)?;
52
53                let bytes = history.serialize()?;
54
55                encode::write_bin(&mut output, &bytes.0)?;
56            }
57            HistoryRecord::Delete(id) => {
58                // 1 -> a history delete
59                encode::write_u8(&mut output, 1)?;
60                encode::write_str(&mut output, id.0.as_str())?;
61            }
62        };
63
64        Ok(DecryptedData(output))
65    }
66
67    pub fn deserialize(bytes: &DecryptedData, version: &str) -> Result<Self> {
68        use rmp::decode;
69
70        fn error_report<E: std::fmt::Debug>(err: E) -> eyre::Report {
71            eyre!("{err:?}")
72        }
73
74        let mut bytes = Bytes::new(&bytes.0);
75
76        let record_type = decode::read_u8(&mut bytes).map_err(error_report)?;
77
78        match record_type {
79            // 0 -> HistoryRecord::Create
80            0 => {
81                // not super useful to us atm, but perhaps in the future
82                // written by write_bin above
83                let _ = decode::read_bin_len(&mut bytes).map_err(error_report)?;
84
85                let record = History::deserialize(bytes.remaining_slice(), version)?;
86
87                Ok(HistoryRecord::Create(record))
88            }
89
90            // 1 -> HistoryRecord::Delete
91            1 => {
92                let bytes = bytes.remaining_slice();
93                let (id, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;
94
95                if !bytes.is_empty() {
96                    bail!(
97                        "trailing bytes decoding HistoryRecord::Delete - malformed? got {bytes:?}"
98                    );
99                }
100
101                Ok(HistoryRecord::Delete(id.to_string().into()))
102            }
103
104            n => {
105                bail!("unknown HistoryRecord type {n}")
106            }
107        }
108    }
109}
110
111impl HistoryStore {
112    pub fn new(store: SqliteStore, host_id: HostId, encryption_key: [u8; 32]) -> Self {
113        HistoryStore {
114            store,
115            host_id,
116            encryption_key,
117        }
118    }
119
120    async fn push_record(&self, record: HistoryRecord) -> Result<(RecordId, RecordIdx)> {
121        let bytes = record.serialize()?;
122        let idx = self
123            .store
124            .last(self.host_id, HISTORY_TAG)
125            .await?
126            .map_or(0, |p| p.idx + 1);
127
128        let record = Record::builder()
129            .host(Host::new(self.host_id))
130            .version(HISTORY_VERSION.to_string())
131            .tag(HISTORY_TAG.to_string())
132            .idx(idx)
133            .data(bytes)
134            .build();
135
136        let id = record.id;
137
138        self.store
139            .push(&record.encrypt::<PASETO_V4>(&self.encryption_key))
140            .await?;
141
142        Ok((id, idx))
143    }
144
145    async fn push_batch(&self, records: impl Iterator<Item = HistoryRecord>) -> Result<()> {
146        let mut ret = Vec::new();
147
148        let idx = self
149            .store
150            .last(self.host_id, HISTORY_TAG)
151            .await?
152            .map_or(0, |p| p.idx + 1);
153
154        // Could probably _also_ do this as an iterator, but let's see how this is for now.
155        // optimizing for minimal sqlite transactions, this code can be optimised later
156        for (n, record) in records.enumerate() {
157            let bytes = record.serialize()?;
158
159            let record = Record::builder()
160                .host(Host::new(self.host_id))
161                .version(HISTORY_VERSION.to_string())
162                .tag(HISTORY_TAG.to_string())
163                .idx(idx + n as u64)
164                .data(bytes)
165                .build();
166
167            let record = record.encrypt::<PASETO_V4>(&self.encryption_key);
168
169            ret.push(record);
170        }
171
172        self.store.push_batch(ret.iter()).await?;
173
174        Ok(())
175    }
176
177    pub async fn delete(&self, id: HistoryId) -> Result<(RecordId, RecordIdx)> {
178        let record = HistoryRecord::Delete(id);
179
180        self.push_record(record).await
181    }
182
183    pub async fn push(&self, history: History) -> Result<(RecordId, RecordIdx)> {
184        // TODO(ellie): move the history store to its own file
185        // it's tiny rn so fine as is
186        let record = HistoryRecord::Create(history);
187
188        self.push_record(record).await
189    }
190
191    pub async fn history(&self) -> Result<Vec<HistoryRecord>> {
192        // Atm this loads all history into memory
193        // Not ideal as that is potentially quite a lot, although history will be small.
194        let records = self.store.all_tagged(HISTORY_TAG).await?;
195        let mut ret = Vec::with_capacity(records.len());
196
197        for record in records.into_iter() {
198            let hist = match record.version.as_str() {
199                HISTORY_VERSION => {
200                    let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?;
201
202                    HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)
203                }
204                version => bail!("unknown history version {version:?}"),
205            }?;
206
207            ret.push(hist);
208        }
209
210        Ok(ret)
211    }
212
213    pub async fn build(&self, database: &dyn Database) -> Result<()> {
214        // I'd like to change how we rebuild and not couple this with the database, but need to
215        // consider the structure more deeply. This will be easy to change.
216
217        // TODO(ellie): page or iterate this
218        let history = self.history().await?;
219
220        // In theory we could flatten this here
221        // The current issue is that the database may have history in it already, from the old sync
222        // This didn't actually delete old history
223        // If we're sure we have a DB only maintained by the new store, we can flatten
224        // create/delete before we even get to sqlite
225        let mut creates = Vec::new();
226        let mut deletes = Vec::new();
227
228        for i in history {
229            match i {
230                HistoryRecord::Create(h) => {
231                    creates.push(h);
232                }
233                HistoryRecord::Delete(id) => {
234                    deletes.push(id);
235                }
236            }
237        }
238
239        database.save_bulk(&creates).await?;
240        database.delete_rows(&deletes).await?;
241
242        Ok(())
243    }
244
245    pub async fn incremental_build(&self, database: &dyn Database, ids: &[RecordId]) -> Result<()> {
246        for id in ids {
247            let record = self.store.get(*id).await;
248
249            let record = if let Ok(record) = record {
250                record
251            } else {
252                continue;
253            };
254
255            if record.tag != HISTORY_TAG {
256                continue;
257            }
258
259            let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?;
260            let record = HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)?;
261
262            match record {
263                HistoryRecord::Create(h) => {
264                    // TODO: benchmark CPU time/memory tradeoff of batch commit vs one at a time
265                    database.save(&h).await?;
266                }
267                HistoryRecord::Delete(id) => {
268                    database.delete_rows(&[id]).await?;
269                }
270            }
271        }
272
273        Ok(())
274    }
275
276    /// Get a list of history IDs that exist in the store
277    /// Note: This currently involves loading all history into memory. This is not going to be a
278    /// large amount in absolute terms, but do not all it in a hot loop.
279    pub async fn history_ids(&self) -> Result<HashSet<HistoryId>> {
280        let history = self.history().await?;
281
282        let ret = HashSet::from_iter(history.iter().map(|h| match h {
283            HistoryRecord::Create(h) => h.id.clone(),
284            HistoryRecord::Delete(id) => id.clone(),
285        }));
286
287        Ok(ret)
288    }
289
290    pub async fn init_store(&self, db: &impl Database) -> Result<()> {
291        let pb = ProgressBar::new_spinner();
292        pb.set_style(
293            ProgressStyle::with_template("{spinner:.blue} {msg}")
294                .unwrap()
295                .with_key("eta", |state: &ProgressState, w: &mut dyn Write| {
296                    write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()
297                })
298                .progress_chars("#>-"),
299        );
300        pb.enable_steady_tick(Duration::from_millis(500));
301
302        pb.set_message("Fetching history from old database");
303
304        let context = current_context();
305        let history = db.list(&[], &context, None, false, true).await?;
306
307        pb.set_message("Fetching history already in store");
308        let store_ids = self.history_ids().await?;
309
310        pb.set_message("Converting old history to new store");
311        let mut records = Vec::new();
312
313        for i in history {
314            debug!("loaded {}", i.id);
315
316            if store_ids.contains(&i.id) {
317                debug!("skipping {} - already exists", i.id);
318                continue;
319            }
320
321            if i.deleted_at.is_some() {
322                records.push(HistoryRecord::Delete(i.id));
323            } else {
324                records.push(HistoryRecord::Create(i));
325            }
326        }
327
328        pb.set_message("Writing to db");
329
330        if !records.is_empty() {
331            self.push_batch(records.into_iter()).await?;
332        }
333
334        pb.finish_with_message("Import complete");
335
336        Ok(())
337    }
338}
339
340#[cfg(test)]
341mod tests {
342    use atuin_common::record::DecryptedData;
343    use time::macros::datetime;
344
345    use crate::history::{store::HistoryRecord, HISTORY_VERSION};
346
347    use super::History;
348
349    #[test]
350    fn test_serialize_deserialize_create() {
351        let bytes = [
352            204, 0, 196, 141, 205, 0, 0, 153, 217, 32, 48, 49, 56, 99, 100, 52, 102, 101, 56, 49,
353            55, 53, 55, 99, 100, 50, 97, 101, 101, 54, 53, 99, 100, 55, 56, 54, 49, 102, 57, 99,
354            56, 49, 207, 23, 166, 251, 212, 181, 82, 0, 0, 100, 0, 162, 108, 115, 217, 41, 47, 85,
355            115, 101, 114, 115, 47, 101, 108, 108, 105, 101, 47, 115, 114, 99, 47, 103, 105, 116,
356            104, 117, 98, 46, 99, 111, 109, 47, 97, 116, 117, 105, 110, 115, 104, 47, 97, 116, 117,
357            105, 110, 217, 32, 48, 49, 56, 99, 100, 52, 102, 101, 97, 100, 56, 57, 55, 53, 57, 55,
358            56, 53, 50, 53, 50, 55, 97, 51, 49, 99, 57, 57, 56, 48, 53, 57, 170, 98, 111, 111, 112,
359            58, 101, 108, 108, 105, 101, 192,
360        ];
361
362        let history = History {
363            id: "018cd4fe81757cd2aee65cd7861f9c81".to_owned().into(),
364            timestamp: datetime!(2024-01-04 00:00:00.000000 +00:00),
365            duration: 100,
366            exit: 0,
367            command: "ls".to_owned(),
368            cwd: "/Users/ellie/src/github.com/atuinsh/atuin".to_owned(),
369            session: "018cd4fead897597852527a31c998059".to_owned(),
370            hostname: "boop:ellie".to_owned(),
371            deleted_at: None,
372        };
373
374        let record = HistoryRecord::Create(history);
375
376        let serialized = record.serialize().expect("failed to serialize history");
377        assert_eq!(serialized.0, bytes);
378
379        let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION)
380            .expect("failed to deserialize HistoryRecord");
381        assert_eq!(deserialized, record);
382
383        // check the snapshot too
384        let deserialized =
385            HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION)
386                .expect("failed to deserialize HistoryRecord");
387        assert_eq!(deserialized, record);
388    }
389
390    #[test]
391    fn test_serialize_deserialize_delete() {
392        let bytes = [
393            204, 1, 217, 32, 48, 49, 56, 99, 100, 52, 102, 101, 56, 49, 55, 53, 55, 99, 100, 50,
394            97, 101, 101, 54, 53, 99, 100, 55, 56, 54, 49, 102, 57, 99, 56, 49,
395        ];
396        let record = HistoryRecord::Delete("018cd4fe81757cd2aee65cd7861f9c81".to_string().into());
397
398        let serialized = record.serialize().expect("failed to serialize history");
399        assert_eq!(serialized.0, bytes);
400
401        let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION)
402            .expect("failed to deserialize HistoryRecord");
403        assert_eq!(deserialized, record);
404
405        let deserialized =
406            HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION)
407                .expect("failed to deserialize HistoryRecord");
408        assert_eq!(deserialized, record);
409    }
410}