1use std::collections::HashSet;
2use std::iter::FromIterator;
3
4use eyre::Result;
5
6use atuin_common::api::AddHistoryRequest;
7use crypto_secretbox::Key;
8use time::OffsetDateTime;
9
10use crate::{
11 api_client,
12 database::Database,
13 encryption::{decrypt, encrypt, load_key},
14 settings::Settings,
15};
16
17pub fn hash_str(string: &str) -> String {
18 use sha2::{Digest, Sha256};
19 let mut hasher = Sha256::new();
20 hasher.update(string.as_bytes());
21 hex::encode(hasher.finalize())
22}
23
24async fn sync_download(
36 key: &Key,
37 force: bool,
38 client: &api_client::Client<'_>,
39 db: &impl Database,
40) -> Result<(i64, i64)> {
41 debug!("starting sync download");
42
43 let remote_status = client.status().await?;
44 let remote_count = remote_status.count;
45
46 let remote_deleted =
48 HashSet::<&str>::from_iter(remote_status.deleted.iter().map(String::as_str));
49
50 let initial_local = db.history_count(true).await?;
51 let mut local_count = initial_local;
52
53 let mut last_sync = if force {
54 OffsetDateTime::UNIX_EPOCH
55 } else {
56 Settings::last_sync()?
57 };
58
59 let mut last_timestamp = OffsetDateTime::UNIX_EPOCH;
60
61 let host = if force { Some(String::from("")) } else { None };
62
63 while remote_count > local_count {
64 let page = client
65 .get_history(last_sync, last_timestamp, host.clone())
66 .await?;
67
68 let history: Vec<_> = page
69 .history
70 .iter()
71 .map(|h| serde_json::from_str(h).expect("invalid base64"))
73 .map(|h| decrypt(h, key).expect("failed to decrypt history! check your key"))
74 .map(|mut h| {
75 if remote_deleted.contains(h.id.0.as_str()) {
76 h.deleted_at = Some(time::OffsetDateTime::now_utc());
77 h.command = String::from("");
78 }
79
80 h
81 })
82 .collect();
83
84 db.save_bulk(&history).await?;
85
86 local_count = db.history_count(true).await?;
87 let remote_page_size = std::cmp::max(remote_status.page_size, 0) as usize;
88
89 if history.len() < remote_page_size {
90 break;
91 }
92
93 let page_last = history
94 .last()
95 .expect("could not get last element of page")
96 .timestamp;
97
98 if page_last == last_timestamp {
102 last_timestamp = OffsetDateTime::UNIX_EPOCH;
103 last_sync -= time::Duration::hours(1);
104 } else {
105 last_timestamp = page_last;
106 }
107 }
108
109 for i in remote_status.deleted {
110 match db.load(i.as_str()).await? {
113 Some(h) => {
114 db.delete(h).await?;
115 }
116 _ => {
117 info!(
118 "could not delete history with id {}, not found locally",
119 i.as_str()
120 );
121 }
122 }
123 }
124
125 Ok((local_count - initial_local, local_count))
126}
127
128async fn sync_upload(
130 key: &Key,
131 _force: bool,
132 client: &api_client::Client<'_>,
133 db: &impl Database,
134) -> Result<()> {
135 debug!("starting sync upload");
136
137 let remote_status = client.status().await?;
138 let remote_deleted: HashSet<String> = HashSet::from_iter(remote_status.deleted.clone());
139
140 let initial_remote_count = client.count().await?;
141 let mut remote_count = initial_remote_count;
142
143 let local_count = db.history_count(true).await?;
144
145 debug!("remote has {}, we have {}", remote_count, local_count);
146
147 let mut cursor = OffsetDateTime::now_utc();
149
150 while local_count > remote_count {
151 let last = db.before(cursor, remote_status.page_size).await?;
152 let mut buffer = Vec::new();
153
154 if last.is_empty() {
155 break;
156 }
157
158 for i in last {
159 let data = encrypt(&i, key)?;
160 let data = serde_json::to_string(&data)?;
161
162 let add_hist = AddHistoryRequest {
163 id: i.id.to_string(),
164 timestamp: i.timestamp,
165 data,
166 hostname: hash_str(&i.hostname),
167 };
168
169 buffer.push(add_hist);
170 }
171
172 client.post_history(&buffer).await?;
174 cursor = buffer.last().unwrap().timestamp;
175 remote_count = client.count().await?;
176
177 debug!("upload cursor: {:?}", cursor);
178 }
179
180 let deleted = db.deleted().await?;
181
182 for i in deleted {
183 if remote_deleted.contains(&i.id.to_string()) {
184 continue;
185 }
186
187 info!("deleting {} on remote", i.id);
188 client.delete_history(i).await?;
189 }
190
191 Ok(())
192}
193
194pub async fn sync(settings: &Settings, force: bool, db: &impl Database) -> Result<()> {
195 let client = api_client::Client::new(
196 &settings.sync_address,
197 settings.session_token()?.as_str(),
198 settings.network_connect_timeout,
199 settings.network_timeout,
200 )?;
201
202 Settings::save_sync_time()?;
203
204 let key = load_key(settings)?; sync_upload(&key, force, &client, db).await?;
207
208 let download = sync_download(&key, force, &client, db).await?;
209
210 debug!("sync downloaded {}", download.0);
211
212 Ok(())
213}