cooklang_sync_client/
syncer.rs

1use futures::{channel::mpsc::Receiver, try_join, StreamExt};
2use std::path::Path;
3
4use std::sync::Arc;
5use time::OffsetDateTime;
6use tokio::sync::Mutex;
7use tokio::time::Duration;
8
9use log::{debug, error, trace};
10
11use crate::chunker::Chunker;
12use crate::connection::{get_connection, ConnectionPool};
13use crate::errors::SyncError;
14use crate::models;
15use crate::registry;
16use crate::remote::{CommitResultStatus, Remote};
17
18type Result<T, E = SyncError> = std::result::Result<T, E>;
19
20const INTERVAL_CHECK_UPLOAD_SEC: Duration = Duration::from_secs(47);
21// TODO should be in sync in multiple places
22const MAX_UPLOAD_SIZE: usize = 3_000_000;
23
24pub async fn run(
25    pool: &ConnectionPool,
26    storage_path: &Path,
27    namespace_id: i32,
28    chunker: &mut Chunker,
29    remote: &Remote,
30    local_registry_updated_rx: Receiver<models::IndexerUpdateEvent>,
31    read_only: bool,
32) -> Result<()> {
33    let chunker = Arc::new(Mutex::new(chunker));
34
35    if read_only {
36        let _ = try_join!(download_loop(
37            pool,
38            Arc::clone(&chunker),
39            remote,
40            storage_path,
41            namespace_id
42        ))?;
43    } else {
44        let _ = try_join!(
45            download_loop(
46                pool,
47                Arc::clone(&chunker),
48                remote,
49                storage_path,
50                namespace_id
51            ),
52            upload_loop(
53                pool,
54                Arc::clone(&chunker),
55                remote,
56                namespace_id,
57                local_registry_updated_rx
58            ),
59        )?;
60    }
61
62    Ok(())
63}
64
65async fn download_loop(
66    pool: &ConnectionPool,
67    chunker: Arc<Mutex<&mut Chunker>>,
68    remote: &Remote,
69    storage_path: &Path,
70    namespace_id: i32,
71) -> Result<()> {
72    loop {
73        match check_download_once(
74            pool,
75            Arc::clone(&chunker),
76            remote,
77            storage_path,
78            namespace_id,
79        )
80        .await
81        {
82            Ok(v) => v,
83            Err(SyncError::Unauthorized) => return Err(SyncError::Unauthorized),
84            Err(_) => return Err(SyncError::Unknown),
85        };
86
87        // need to be longer than request timeout to make sure we don't get
88        // client side timeout error
89        remote.poll().await?;
90    }
91}
92
93pub async fn upload_loop(
94    pool: &ConnectionPool,
95    chunker: Arc<Mutex<&mut Chunker>>,
96    remote: &Remote,
97    namespace_id: i32,
98    mut local_registry_updated_rx: Receiver<models::IndexerUpdateEvent>,
99) -> Result<()> {
100    // wait for indexer to work first
101    tokio::time::sleep(Duration::from_secs(5)).await;
102
103    loop {
104        // need to wait only if we didn't upload anything
105        // otherwise it should re-run immideately
106        if check_upload_once(pool, Arc::clone(&chunker), remote, namespace_id).await? {
107            // TODO test that it doesn't cancle stream
108            tokio::select! {
109                _ = tokio::time::sleep(INTERVAL_CHECK_UPLOAD_SEC) => {},
110                Some(_) = local_registry_updated_rx.next() => {},
111            };
112        }
113    }
114}
115
116pub async fn check_upload_once(
117    pool: &ConnectionPool,
118    chunker: Arc<Mutex<&mut Chunker>>,
119    remote: &Remote,
120    namespace_id: i32,
121) -> Result<bool> {
122    debug!("upload scan");
123
124    let conn = &mut get_connection(pool)?;
125    let to_upload = registry::updated_locally(conn, namespace_id)?;
126
127    let mut upload_queue: Vec<Vec<(String, Vec<u8>)>> = vec![vec![]];
128    let mut size = 0;
129    let mut last = upload_queue.last_mut().unwrap();
130    let mut all_commited = true;
131
132    for f in &to_upload {
133        trace!("to upload {:?}", f);
134        let mut chunker = chunker.lock().await;
135        let mut chunk_ids = vec![String::from("")];
136
137        if !f.deleted {
138            // Also warms up the cache
139            chunk_ids = chunker.hashify(&f.path).await?;
140        }
141
142        let r = remote
143            .commit(&f.path, f.deleted, &chunk_ids.join(","))
144            .await?;
145
146        match r {
147            CommitResultStatus::Success(jid) => {
148                trace!("commit success");
149                registry::update_jid(conn, f, jid)?;
150            }
151            CommitResultStatus::NeedChunks(chunks) => {
152                trace!("need chunks");
153
154                all_commited = false;
155
156                for c in chunks.split(',') {
157                    let data = chunker.read_chunk(c)?;
158                    size += data.len();
159                    last.push((c.into(), data));
160
161                    if size > MAX_UPLOAD_SIZE {
162                        upload_queue.push(vec![]);
163                        last = upload_queue.last_mut().unwrap();
164                        size = 0;
165                    }
166                }
167            }
168        }
169    }
170
171    for batch in upload_queue {
172        if !batch.is_empty() {
173            remote.upload_batch(batch).await?;
174        }
175    }
176
177    Ok(all_commited)
178}
179
180pub async fn check_download_once(
181    pool: &ConnectionPool,
182    chunker: Arc<Mutex<&mut Chunker>>,
183    remote: &Remote,
184    storage_path: &Path,
185    namespace_id: i32,
186) -> Result<bool> {
187    debug!("download scan");
188
189    let conn = &mut get_connection(pool)?;
190
191    let latest_local = registry::latest_jid(conn, namespace_id).unwrap_or(0);
192    let to_download = remote.list(latest_local).await?;
193    // TODO maybe should limit one download at a time and use batches
194    // it can also overflow in-memory cache
195    let mut download_queue: Vec<&str> = vec![];
196
197    for d in &to_download {
198        trace!("collecting needed chunks for {:?}", d);
199
200        if d.deleted {
201            continue;
202        }
203
204        let mut chunker = chunker.lock().await;
205
206        // Warm-up cache to include chunks from an old file
207        if chunker.exists(&d.path) {
208            chunker.hashify(&d.path).await?;
209        }
210
211        for c in d.chunk_ids.split(',') {
212            if chunker.check_chunk(c) {
213                continue;
214            }
215
216            download_queue.push(c);
217        }
218    }
219
220    if !download_queue.is_empty() {
221        let mut chunker = chunker.lock().await;
222
223        let mut downloaded = remote.download_batch(download_queue).await;
224
225        while let Some(result) = downloaded.next().await {
226            match result {
227                Ok((chunk_id, data)) => {
228                    chunker.save_chunk(&chunk_id, data)?;
229                }
230                Err(e) => {
231                    return Err(e);
232                }
233            }
234        }
235    }
236
237    for d in &to_download {
238        trace!("udpating downloaded files {:?}", d);
239
240        let mut chunker = chunker.lock().await;
241
242        if d.deleted {
243            let form = build_delete_form(&d.path, storage_path, d.id, namespace_id);
244            // TODO atomic?
245            registry::delete(conn, &vec![form])?;
246            if chunker.exists(&d.path) {
247                chunker.delete(&d.path).await?;
248            }
249        } else {
250            let chunks: Vec<&str> = d.chunk_ids.split(',').collect();
251            // TODO atomic? store in tmp first and then move?
252            // TODO should be after we create record in db
253            if let Err(e) = chunker.save(&d.path, chunks).await {
254                error!("{:?}", e);
255                return Err(e);
256            }
257
258            let form = build_file_record(&d.path, storage_path, d.id, namespace_id)?;
259            registry::create(conn, &vec![form])?;
260        }
261    }
262
263    Ok(!to_download.is_empty())
264}
265
266fn build_file_record(
267    path: &str,
268    base: &Path,
269    jid: i32,
270    namespace_id: i32,
271) -> Result<models::CreateForm, SyncError> {
272    let mut full_path = base.to_path_buf();
273    full_path.push(path);
274    let metadata = full_path
275        .metadata()
276        .map_err(|e| SyncError::from_io_error(path, e))?;
277    let size: i64 = metadata.len().try_into()?;
278    let time = metadata
279        .modified()
280        .map_err(|e| SyncError::from_io_error(path, e))?;
281    let modified_at = OffsetDateTime::from(time);
282
283    let form = models::CreateForm {
284        jid: Some(jid),
285        path: path.to_string(),
286        deleted: false,
287        size,
288        modified_at,
289        namespace_id,
290    };
291
292    Ok(form)
293}
294
295fn build_delete_form(path: &str, base: &Path, jid: i32, namespace_id: i32) -> models::DeleteForm {
296    let mut full_path = base.to_path_buf();
297    full_path.push(path);
298
299    models::DeleteForm {
300        path: path.to_string(),
301        jid: Some(jid),
302        deleted: true,
303        size: 0,
304        modified_at: OffsetDateTime::now_utc(),
305        namespace_id,
306    }
307}