1use futures::{channel::mpsc::Receiver, try_join, StreamExt};
2use std::path::Path;
3
4use std::sync::Arc;
5use time::OffsetDateTime;
6use tokio::sync::Mutex;
7use tokio::time::Duration;
8
9use log::{debug, error, trace};
10
11use crate::chunker::Chunker;
12use crate::connection::{get_connection, ConnectionPool};
13use crate::errors::SyncError;
14use crate::models;
15use crate::registry;
16use crate::remote::{CommitResultStatus, Remote};
17
18type Result<T, E = SyncError> = std::result::Result<T, E>;
19
20const INTERVAL_CHECK_UPLOAD_SEC: Duration = Duration::from_secs(47);
21const MAX_UPLOAD_SIZE: usize = 3_000_000;
23
24pub async fn run(
25 pool: &ConnectionPool,
26 storage_path: &Path,
27 namespace_id: i32,
28 chunker: &mut Chunker,
29 remote: &Remote,
30 local_registry_updated_rx: Receiver<models::IndexerUpdateEvent>,
31 read_only: bool,
32) -> Result<()> {
33 let chunker = Arc::new(Mutex::new(chunker));
34
35 if read_only {
36 let _ = try_join!(download_loop(
37 pool,
38 Arc::clone(&chunker),
39 remote,
40 storage_path,
41 namespace_id
42 ))?;
43 } else {
44 let _ = try_join!(
45 download_loop(
46 pool,
47 Arc::clone(&chunker),
48 remote,
49 storage_path,
50 namespace_id
51 ),
52 upload_loop(
53 pool,
54 Arc::clone(&chunker),
55 remote,
56 namespace_id,
57 local_registry_updated_rx
58 ),
59 )?;
60 }
61
62 Ok(())
63}
64
65async fn download_loop(
66 pool: &ConnectionPool,
67 chunker: Arc<Mutex<&mut Chunker>>,
68 remote: &Remote,
69 storage_path: &Path,
70 namespace_id: i32,
71) -> Result<()> {
72 loop {
73 match check_download_once(
74 pool,
75 Arc::clone(&chunker),
76 remote,
77 storage_path,
78 namespace_id,
79 )
80 .await
81 {
82 Ok(v) => v,
83 Err(SyncError::Unauthorized) => return Err(SyncError::Unauthorized),
84 Err(_) => return Err(SyncError::Unknown),
85 };
86
87 remote.poll().await?;
90 }
91}
92
93pub async fn upload_loop(
94 pool: &ConnectionPool,
95 chunker: Arc<Mutex<&mut Chunker>>,
96 remote: &Remote,
97 namespace_id: i32,
98 mut local_registry_updated_rx: Receiver<models::IndexerUpdateEvent>,
99) -> Result<()> {
100 tokio::time::sleep(Duration::from_secs(5)).await;
102
103 loop {
104 if check_upload_once(pool, Arc::clone(&chunker), remote, namespace_id).await? {
107 tokio::select! {
109 _ = tokio::time::sleep(INTERVAL_CHECK_UPLOAD_SEC) => {},
110 Some(_) = local_registry_updated_rx.next() => {},
111 };
112 }
113 }
114}
115
116pub async fn check_upload_once(
117 pool: &ConnectionPool,
118 chunker: Arc<Mutex<&mut Chunker>>,
119 remote: &Remote,
120 namespace_id: i32,
121) -> Result<bool> {
122 debug!("upload scan");
123
124 let conn = &mut get_connection(pool)?;
125 let to_upload = registry::updated_locally(conn, namespace_id)?;
126
127 let mut upload_queue: Vec<Vec<(String, Vec<u8>)>> = vec![vec![]];
128 let mut size = 0;
129 let mut last = upload_queue.last_mut().unwrap();
130 let mut all_commited = true;
131
132 for f in &to_upload {
133 trace!("to upload {:?}", f);
134 let mut chunker = chunker.lock().await;
135 let mut chunk_ids = vec![String::from("")];
136
137 if !f.deleted {
138 chunk_ids = chunker.hashify(&f.path).await?;
140 }
141
142 let r = remote
143 .commit(&f.path, f.deleted, &chunk_ids.join(","))
144 .await?;
145
146 match r {
147 CommitResultStatus::Success(jid) => {
148 trace!("commit success");
149 registry::update_jid(conn, f, jid)?;
150 }
151 CommitResultStatus::NeedChunks(chunks) => {
152 trace!("need chunks");
153
154 all_commited = false;
155
156 for c in chunks.split(',') {
157 let data = chunker.read_chunk(c)?;
158 size += data.len();
159 last.push((c.into(), data));
160
161 if size > MAX_UPLOAD_SIZE {
162 upload_queue.push(vec![]);
163 last = upload_queue.last_mut().unwrap();
164 size = 0;
165 }
166 }
167 }
168 }
169 }
170
171 for batch in upload_queue {
172 if !batch.is_empty() {
173 remote.upload_batch(batch).await?;
174 }
175 }
176
177 Ok(all_commited)
178}
179
180pub async fn check_download_once(
181 pool: &ConnectionPool,
182 chunker: Arc<Mutex<&mut Chunker>>,
183 remote: &Remote,
184 storage_path: &Path,
185 namespace_id: i32,
186) -> Result<bool> {
187 debug!("download scan");
188
189 let conn = &mut get_connection(pool)?;
190
191 let latest_local = registry::latest_jid(conn, namespace_id).unwrap_or(0);
192 let to_download = remote.list(latest_local).await?;
193 let mut download_queue: Vec<&str> = vec![];
196
197 for d in &to_download {
198 trace!("collecting needed chunks for {:?}", d);
199
200 if d.deleted {
201 continue;
202 }
203
204 let mut chunker = chunker.lock().await;
205
206 if chunker.exists(&d.path) {
208 chunker.hashify(&d.path).await?;
209 }
210
211 for c in d.chunk_ids.split(',') {
212 if chunker.check_chunk(c) {
213 continue;
214 }
215
216 download_queue.push(c);
217 }
218 }
219
220 if !download_queue.is_empty() {
221 let mut chunker = chunker.lock().await;
222
223 let mut downloaded = remote.download_batch(download_queue).await;
224
225 while let Some(result) = downloaded.next().await {
226 match result {
227 Ok((chunk_id, data)) => {
228 chunker.save_chunk(&chunk_id, data)?;
229 }
230 Err(e) => {
231 return Err(e);
232 }
233 }
234 }
235 }
236
237 for d in &to_download {
238 trace!("udpating downloaded files {:?}", d);
239
240 let mut chunker = chunker.lock().await;
241
242 if d.deleted {
243 let form = build_delete_form(&d.path, storage_path, d.id, namespace_id);
244 registry::delete(conn, &vec![form])?;
246 if chunker.exists(&d.path) {
247 chunker.delete(&d.path).await?;
248 }
249 } else {
250 let chunks: Vec<&str> = d.chunk_ids.split(',').collect();
251 if let Err(e) = chunker.save(&d.path, chunks).await {
254 error!("{:?}", e);
255 return Err(e);
256 }
257
258 let form = build_file_record(&d.path, storage_path, d.id, namespace_id)?;
259 registry::create(conn, &vec![form])?;
260 }
261 }
262
263 Ok(!to_download.is_empty())
264}
265
266fn build_file_record(
267 path: &str,
268 base: &Path,
269 jid: i32,
270 namespace_id: i32,
271) -> Result<models::CreateForm, SyncError> {
272 let mut full_path = base.to_path_buf();
273 full_path.push(path);
274 let metadata = full_path
275 .metadata()
276 .map_err(|e| SyncError::from_io_error(path, e))?;
277 let size: i64 = metadata.len().try_into()?;
278 let time = metadata
279 .modified()
280 .map_err(|e| SyncError::from_io_error(path, e))?;
281 let modified_at = OffsetDateTime::from(time);
282
283 let form = models::CreateForm {
284 jid: Some(jid),
285 path: path.to_string(),
286 deleted: false,
287 size,
288 modified_at,
289 namespace_id,
290 };
291
292 Ok(form)
293}
294
295fn build_delete_form(path: &str, base: &Path, jid: i32, namespace_id: i32) -> models::DeleteForm {
296 let mut full_path = base.to_path_buf();
297 full_path.push(path);
298
299 models::DeleteForm {
300 path: path.to_string(),
301 jid: Some(jid),
302 deleted: true,
303 size: 0,
304 modified_at: OffsetDateTime::now_utc(),
305 namespace_id,
306 }
307}