1use std::collections::HashSet;
2use std::convert::TryInto;
3use std::iter::FromIterator;
4
5use eyre::Result;
6
7use atuin_common::api::AddHistoryRequest;
8use crypto_secretbox::Key;
9use time::OffsetDateTime;
10
11use crate::{
12 api_client,
13 database::Database,
14 encryption::{decrypt, encrypt, load_key},
15 settings::Settings,
16};
17
18pub fn hash_str(string: &str) -> String {
19 use sha2::{Digest, Sha256};
20 let mut hasher = Sha256::new();
21 hasher.update(string.as_bytes());
22 hex::encode(hasher.finalize())
23}
24
25async fn sync_download(
37 key: &Key,
38 force: bool,
39 client: &api_client::Client<'_>,
40 db: &impl Database,
41) -> Result<(i64, i64)> {
42 debug!("starting sync download");
43
44 let remote_status = client.status().await?;
45 let remote_count = remote_status.count;
46
47 let remote_deleted =
49 HashSet::<&str>::from_iter(remote_status.deleted.iter().map(String::as_str));
50
51 let initial_local = db.history_count(true).await?;
52 let mut local_count = initial_local;
53
54 let mut last_sync = if force {
55 OffsetDateTime::UNIX_EPOCH
56 } else {
57 Settings::last_sync()?
58 };
59
60 let mut last_timestamp = OffsetDateTime::UNIX_EPOCH;
61
62 let host = if force { Some(String::from("")) } else { None };
63
64 while remote_count > local_count {
65 let page = client
66 .get_history(last_sync, last_timestamp, host.clone())
67 .await?;
68
69 let history: Vec<_> = page
70 .history
71 .iter()
72 .map(|h| serde_json::from_str(h).expect("invalid base64"))
74 .map(|h| decrypt(h, key).expect("failed to decrypt history! check your key"))
75 .map(|mut h| {
76 if remote_deleted.contains(h.id.0.as_str()) {
77 h.deleted_at = Some(time::OffsetDateTime::now_utc());
78 h.command = String::from("");
79 }
80
81 h
82 })
83 .collect();
84
85 db.save_bulk(&history).await?;
86
87 local_count = db.history_count(true).await?;
88
89 if history.len() < remote_status.page_size.try_into().unwrap() {
90 break;
91 }
92
93 let page_last = history
94 .last()
95 .expect("could not get last element of page")
96 .timestamp;
97
98 if page_last == last_timestamp {
102 last_timestamp = OffsetDateTime::UNIX_EPOCH;
103 last_sync -= time::Duration::hours(1);
104 } else {
105 last_timestamp = page_last;
106 }
107 }
108
109 for i in remote_status.deleted {
110 if let Some(h) = db.load(i.as_str()).await? {
113 db.delete(h).await?;
114 } else {
115 info!(
116 "could not delete history with id {}, not found locally",
117 i.as_str()
118 );
119 }
120 }
121
122 Ok((local_count - initial_local, local_count))
123}
124
125async fn sync_upload(
127 key: &Key,
128 _force: bool,
129 client: &api_client::Client<'_>,
130 db: &impl Database,
131) -> Result<()> {
132 debug!("starting sync upload");
133
134 let remote_status = client.status().await?;
135 let remote_deleted: HashSet<String> = HashSet::from_iter(remote_status.deleted.clone());
136
137 let initial_remote_count = client.count().await?;
138 let mut remote_count = initial_remote_count;
139
140 let local_count = db.history_count(true).await?;
141
142 debug!("remote has {}, we have {}", remote_count, local_count);
143
144 let mut cursor = OffsetDateTime::now_utc();
146
147 while local_count > remote_count {
148 let last = db.before(cursor, remote_status.page_size).await?;
149 let mut buffer = Vec::new();
150
151 if last.is_empty() {
152 break;
153 }
154
155 for i in last {
156 let data = encrypt(&i, key)?;
157 let data = serde_json::to_string(&data)?;
158
159 let add_hist = AddHistoryRequest {
160 id: i.id.to_string(),
161 timestamp: i.timestamp,
162 data,
163 hostname: hash_str(&i.hostname),
164 };
165
166 buffer.push(add_hist);
167 }
168
169 client.post_history(&buffer).await?;
171 cursor = buffer.last().unwrap().timestamp;
172 remote_count = client.count().await?;
173
174 debug!("upload cursor: {:?}", cursor);
175 }
176
177 let deleted = db.deleted().await?;
178
179 for i in deleted {
180 if remote_deleted.contains(&i.id.to_string()) {
181 continue;
182 }
183
184 info!("deleting {} on remote", i.id);
185 client.delete_history(i).await?;
186 }
187
188 Ok(())
189}
190
191pub async fn sync(settings: &Settings, force: bool, db: &impl Database) -> Result<()> {
192 let client = api_client::Client::new(
193 &settings.sync_address,
194 settings.session_token()?.as_str(),
195 settings.network_connect_timeout,
196 settings.network_timeout,
197 )?;
198
199 Settings::save_sync_time()?;
200
201 let key = load_key(settings)?; sync_upload(&key, force, &client, db).await?;
204
205 let download = sync_download(&key, force, &client, db).await?;
206
207 debug!("sync downloaded {}", download.0);
208
209 Ok(())
210}