atuin_client/
sync.rs

1use std::collections::HashSet;
2use std::iter::FromIterator;
3
4use eyre::Result;
5
6use atuin_common::api::AddHistoryRequest;
7use crypto_secretbox::Key;
8use time::OffsetDateTime;
9
10use crate::{
11    api_client,
12    database::Database,
13    encryption::{decrypt, encrypt, load_key},
14    settings::Settings,
15};
16
17pub fn hash_str(string: &str) -> String {
18    use sha2::{Digest, Sha256};
19    let mut hasher = Sha256::new();
20    hasher.update(string.as_bytes());
21    hex::encode(hasher.finalize())
22}
23
24// Currently sync is kinda naive, and basically just pages backwards through
25// history. This means newly added stuff shows up properly! We also just use
26// the total count in each database to indicate whether a sync is needed.
27// I think this could be massively improved! If we had a way of easily
28// indicating count per time period (hour, day, week, year, etc) then we can
29// easily pinpoint where we are missing data and what needs downloading. Start
30// with year, then find the week, then the day, then the hour, then download it
31// all! The current naive approach will do for now.
32
33// Check if remote has things we don't, and if so, download them.
34// Returns (num downloaded, total local)
35async fn sync_download(
36    key: &Key,
37    force: bool,
38    client: &api_client::Client<'_>,
39    db: &impl Database,
40) -> Result<(i64, i64)> {
41    debug!("starting sync download");
42
43    let remote_status = client.status().await?;
44    let remote_count = remote_status.count;
45
46    // useful to ensure we don't even save something that hasn't yet been synced + deleted
47    let remote_deleted =
48        HashSet::<&str>::from_iter(remote_status.deleted.iter().map(String::as_str));
49
50    let initial_local = db.history_count(true).await?;
51    let mut local_count = initial_local;
52
53    let mut last_sync = if force {
54        OffsetDateTime::UNIX_EPOCH
55    } else {
56        Settings::last_sync()?
57    };
58
59    let mut last_timestamp = OffsetDateTime::UNIX_EPOCH;
60
61    let host = if force { Some(String::from("")) } else { None };
62
63    while remote_count > local_count {
64        let page = client
65            .get_history(last_sync, last_timestamp, host.clone())
66            .await?;
67
68        let history: Vec<_> = page
69            .history
70            .iter()
71            // TODO: handle deletion earlier in this chain
72            .map(|h| serde_json::from_str(h).expect("invalid base64"))
73            .map(|h| decrypt(h, key).expect("failed to decrypt history! check your key"))
74            .map(|mut h| {
75                if remote_deleted.contains(h.id.0.as_str()) {
76                    h.deleted_at = Some(time::OffsetDateTime::now_utc());
77                    h.command = String::from("");
78                }
79
80                h
81            })
82            .collect();
83
84        db.save_bulk(&history).await?;
85
86        local_count = db.history_count(true).await?;
87        let remote_page_size = std::cmp::max(remote_status.page_size, 0) as usize;
88
89        if history.len() < remote_page_size {
90            break;
91        }
92
93        let page_last = history
94            .last()
95            .expect("could not get last element of page")
96            .timestamp;
97
98        // in the case of a small sync frequency, it's possible for history to
99        // be "lost" between syncs. In this case we need to rewind the sync
100        // timestamps
101        if page_last == last_timestamp {
102            last_timestamp = OffsetDateTime::UNIX_EPOCH;
103            last_sync -= time::Duration::hours(1);
104        } else {
105            last_timestamp = page_last;
106        }
107    }
108
109    for i in remote_status.deleted {
110        // we will update the stored history to have this data
111        // pretty much everything can be nullified
112        match db.load(i.as_str()).await? {
113            Some(h) => {
114                db.delete(h).await?;
115            }
116            _ => {
117                info!(
118                    "could not delete history with id {}, not found locally",
119                    i.as_str()
120                );
121            }
122        }
123    }
124
125    Ok((local_count - initial_local, local_count))
126}
127
128// Check if we have things remote doesn't, and if so, upload them
129async fn sync_upload(
130    key: &Key,
131    _force: bool,
132    client: &api_client::Client<'_>,
133    db: &impl Database,
134) -> Result<()> {
135    debug!("starting sync upload");
136
137    let remote_status = client.status().await?;
138    let remote_deleted: HashSet<String> = HashSet::from_iter(remote_status.deleted.clone());
139
140    let initial_remote_count = client.count().await?;
141    let mut remote_count = initial_remote_count;
142
143    let local_count = db.history_count(true).await?;
144
145    debug!("remote has {}, we have {}", remote_count, local_count);
146
147    // first just try the most recent set
148    let mut cursor = OffsetDateTime::now_utc();
149
150    while local_count > remote_count {
151        let last = db.before(cursor, remote_status.page_size).await?;
152        let mut buffer = Vec::new();
153
154        if last.is_empty() {
155            break;
156        }
157
158        for i in last {
159            let data = encrypt(&i, key)?;
160            let data = serde_json::to_string(&data)?;
161
162            let add_hist = AddHistoryRequest {
163                id: i.id.to_string(),
164                timestamp: i.timestamp,
165                data,
166                hostname: hash_str(&i.hostname),
167            };
168
169            buffer.push(add_hist);
170        }
171
172        // anything left over outside of the 100 block size
173        client.post_history(&buffer).await?;
174        cursor = buffer.last().unwrap().timestamp;
175        remote_count = client.count().await?;
176
177        debug!("upload cursor: {:?}", cursor);
178    }
179
180    let deleted = db.deleted().await?;
181
182    for i in deleted {
183        if remote_deleted.contains(&i.id.to_string()) {
184            continue;
185        }
186
187        info!("deleting {} on remote", i.id);
188        client.delete_history(i).await?;
189    }
190
191    Ok(())
192}
193
194pub async fn sync(settings: &Settings, force: bool, db: &impl Database) -> Result<()> {
195    let client = api_client::Client::new(
196        &settings.sync_address,
197        settings.session_token()?.as_str(),
198        settings.network_connect_timeout,
199        settings.network_timeout,
200    )?;
201
202    Settings::save_sync_time()?;
203
204    let key = load_key(settings)?; // encryption key
205
206    sync_upload(&key, force, &client, db).await?;
207
208    let download = sync_download(&key, force, &client, db).await?;
209
210    debug!("sync downloaded {}", download.0);
211
212    Ok(())
213}